elif block["block_level"] in ["accept", "accepted"]:
logger.debug("block[blocked]='%s' is accepted, not wanted here - SKIPPED!", block["blocked"])
continue
- elif instances.is_recent(block["blocked"]):
- logger.debug("block[blocked]='%s' has recently been crawled - SKIPPED!", block["blocked"])
- continue
elif not instances.is_registered(block["blocked"]):
logger.debug("Hash wasn't found, adding: blocked='%s',blocker='%s'", block["blocked"], blocker)
federation.fetch_instances(block["blocked"], blocker, None, inspect.currentframe().f_code.co_name)
block["block_level"] = blocks.alias_block_level(block["block_level"])
- if processing.block(blocker, block["blocked"], block["reason"], block["block_level"]) and block["block_level"] == "reject" and config.get("bot_enabled"):
+ if processing.block(blocker, block["blocked"], block["reason"], block["block_level"]) and block["block_level"] in ["reject", "suspend"] and config.get("bot_enabled"):
logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", block["blocked"], block["block_level"], blocker)
blockdict.append({
"blocked": block["blocked"],
blocks.add(blocker, blocked, reason, block_level)
added = True
else:
+ if reason not in [None, ""] and blocks.get_reason(blocker, blocked, block_level) is None:
+ logger.debug("Updating reason='%s' for blocker='%s',blocked='%s',block_level='%s' ...", reason, blocker, blocked, block_level)
+ blocks.update_reason(reason, blocker, blocked, block_level)
+
logger.debug("Updating last_seen for blocker='%s',blocked='%s',block_level='%s' ...", blocker, blocked, block_level)
blocks.update_last_seen(blocker, blocked, block_level)
if not isinstance(url, str):
raise ValueError(f"url[]='{url}' is not of type 'str'")
- elif url == "":
+ elif url in [None, ""]:
raise ValueError("Parameter 'url' is empty")
elif not validators.url(url):
raise ValueError(f"Parameter url='{url}' is not a valid URL")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
+def get_reason(blocker: str, blocked: str, block_level: str):
+ logger.debug("blocker='%s',blocked='%s',block_level='%s' - CALLED!", blocker, blocked, block_level)
+ domain_helper.raise_on(blocker)
+ domain_helper.raise_on(blocked)
+
+ if not isinstance(block_level, str):
+ raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
+ elif block_level == "":
+ raise ValueError("Parameter 'block_level' is empty")
+ elif block_level in ["accept", "suspend", "silence", "nsfw", "quarantined_instances"]:
+ raise ValueError(f"block_level='{block_level}' is not wanted.")
+ elif blacklist.is_blacklisted(blocker):
+ raise Exception(f"blocker='{blocker}' is blacklisted but function invoked")
+ elif blacklist.is_blacklisted(blocked):
+ raise Exception(f"blocked='{blocked}' is blacklisted but function invoked")
+ elif not is_instance_blocked(blocker, blocked, block_level):
+ raise Exception(f"blocker='{blocker}',blocked='{blocked}',block_level='{block_level}' is not registered but function is invoked")
+
+ database.cursor.execute(
+ "SELECT reason FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
+ [
+ blocker,
+ blocked,
+ block_level
+ ]
+ )
+
+ row = database.cursor.fetchone()
+ logger.debug("row[]='%s'", type(row))
+
+ logger.debug("row[reason]='%s' - EXIT!", row["reason"])
+ return row["reason"]
+
def update_reason(reason: str, blocker: str, blocked: str, block_level: str):
logger.debug("reason='%s',blocker='%s',blocked='%s',block_level='%s' - CALLED!", reason, blocker, blocked, block_level)
domain_helper.raise_on(blocker)