]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Sat, 11 Jan 2025 20:41:45 +0000 (21:41 +0100)
committerRoland Häder <roland@mxchange.org>
Sat, 11 Jan 2025 20:41:45 +0000 (21:41 +0100)
- ops, was conflicting name
- added debug message

fba/commands.py
fba/networks/mastodon.py

index ca921e292500dc3d92b70ea89fa8334b8ae7b729..673470b096a556bbc34338a504fff2cd6d4131c9 100644 (file)
@@ -305,63 +305,63 @@ def fetch_blocks(args: argparse.Namespace) -> int:
 
     logger.info("Checking %d entries ...", len(rows))
     for row in rows:
-        logger.debug("row[blocker]='%s',row[software]='%s',row[software]='%s',nodeinfo_url='%s'", row["blocker"], row["software"], row["origin"], row["nodeinfo_url"])
+        logger.debug("row[domain]='%s',row[software]='%s',row[software]='%s',nodeinfo_url='%s'", row["domain"], row["software"], row["origin"], row["nodeinfo_url"])
 
-        if not domain_helper.is_wanted(row["blocker"]):
-            logger.warning("row[blocker]='%s' is not wanted - SKIPPED!", row["blocker"])
+        if not domain_helper.is_wanted(row["domain"]):
+            logger.warning("row[domain]='%s' is not wanted - SKIPPED!", row["domain"])
             continue
-        elif not args.force_all and instances.is_recent(row["blocker"], "last_blocked"):
-            logger.debug("row[blocker]='%s' has recently been crawled - SKIPPED!", row["blocker"])
+        elif not args.force_all and instances.is_recent(row["domain"], "last_blocked"):
+            logger.debug("row[domain]='%s' has recently been crawled - SKIPPED!", row["domain"])
             continue
         elif row["software"] is None:
-            logger.warning("row[blocker]='%s' has no software set - SKIPPED!", row["blocker"])
+            logger.warning("row[domain]='%s' has no software set - SKIPPED!", row["domain"])
             continue
 
-        logger.debug("Setting last_blocked,has_obfuscation=false for row[blocker]='%s' ...", row["blocker"])
-        instances.set_last_blocked(row["blocker"])
-        instances.set_has_obfuscation(row["blocker"], False)
+        logger.debug("Setting last_blocked,has_obfuscation=false for row[domain]='%s' ...", row["domain"])
+        instances.set_last_blocked(row["domain"])
+        instances.set_has_obfuscation(row["domain"], False)
 
         # c.s isn't part of oliphant's "hidden" blocklists
-        if row["blocker"] == "chaos.social" or software_helper.is_relay(row["software"]) or blocklists.has(row["blocker"]):
-            logger.debug("Skipping row[blocker]='%s', run ./fba.py fetch_cs, fetch_oliphant, fetch_csv instead!", row["blocker"])
+        if row["domain"] == "chaos.social" or software_helper.is_relay(row["software"]) or blocklists.has(row["domain"]):
+            logger.debug("Skipping row[domain]='%s', run ./fba.py fetch_cs, fetch_oliphant, fetch_csv instead!", row["domain"])
             continue
 
-        logger.debug("Invoking federation.fetch_blocks(%s) ...", row["blocker"])
-        blocking = federation.fetch_blocks(row["blocker"])
+        logger.debug("Invoking federation.fetch_blocks(%s) ...", row["domain"])
+        blocking = federation.fetch_blocks(row["domain"])
 
-        logger.debug("row[blocker]='%s',row[software]='%s',blocking()=%d", row["blocker"], row["software"], len(blocking))
+        logger.debug("row[domain]='%s',row[software]='%s',blocking()=%d", row["domain"], row["software"], len(blocking))
         if len(blocking) == 0:
-            logger.debug("row[blocker]='%s',row[software]='%s' - fetching blocklist ...", row["blocker"], row["software"])
+            logger.debug("row[domain]='%s',row[software]='%s' - fetching blocklist ...", row["domain"], row["software"])
             if row["software"] == "pleroma":
-                blocking = pleroma.fetch_blocks(row["blocker"])
-                logger.debug("row[blocker]='%s' returned %d entries,row[software]='%s'", row["blocker"], len(blocking), row["software"])
+                blocking = pleroma.fetch_blocks(row["domain"])
+                logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"])
             elif row["software"] == "mastodon":
-                blocking = mastodon.fetch_blocks(row["blocker"])
-                logger.debug("row[blocker]='%s' returned %d entries,row[software]='%s'", row["blocker"], len(blocking), row["software"])
+                blocking = mastodon.fetch_blocks(row["domain"])
+                logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"])
             elif row["software"] == "lemmy":
-                blocking = lemmy.fetch_blocks(row["blocker"])
-                logger.debug("row[blocker]='%s' returned %d entries,row[software]='%s'", row["blocker"], len(blocking), row["software"])
+                blocking = lemmy.fetch_blocks(row["domain"])
+                logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"])
             elif row["software"] == "friendica":
-                blocking = friendica.fetch_blocks(row["blocker"])
-                logger.debug("row[blocker]='%s' returned %d entries,row[software]='%s'", row["blocker"], len(blocking), row["software"])
+                blocking = friendica.fetch_blocks(row["domain"])
+                logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"])
             elif row["software"] == "misskey":
-                blocking = misskey.fetch_blocks(row["blocker"])
-                logger.debug("row[blocker]='%s' returned %d entries,row[software]='%s'", row["blocker"], len(blocking), row["software"])
+                blocking = misskey.fetch_blocks(row["domain"])
+                logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"])
             else:
-                logger.warning("Unknown software: row[blocker]='%s',row[software]='%s'", row["blocker"], row["software"])
+                logger.warning("Unknown software: row[domain]='%s',row[software]='%s'", row["domain"], row["software"])
 
-        logger.debug("Invoking instances.set_total_blocks(%s, %d) ...", row["blocker"], len(blocking))
-        instances.set_total_blocks(row["blocker"], blocking)
+        logger.debug("Invoking instances.set_total_blocks(%s, %d) ...", row["domain"], len(blocking))
+        instances.set_total_blocks(row["domain"], blocking)
 
         blockdict = list()
         deobfuscated = obfuscated = 0
 
-        logger.info("Checking %d entries from row[blocker]='%s',row[software]='%s' ...", len(blocking), row["blocker"], row["software"])
+        logger.info("Checking %d entries from row[domain]='%s',row[software]='%s' ...", len(blocking), row["domain"], row["software"])
         for block in blocking:
             logger.debug("blocked='%s',block_level='%s',reason='%s'", block["blocked"], block["block_level"], block["reason"])
 
             if block["block_level"] in [None, ""]:
-                logger.warning("block_level is empty, row[blocker]='%s',blocked='%s'", block["blocker"], block["blocked"])
+                logger.warning("block_level is empty, row[domain]='%s',blocked='%s'", block["blocker"], block["blocked"])
                 continue
 
             logger.debug("blocked='%s',reason='%s' - BEFORE!", block["blocked"], block["reason"])
@@ -370,22 +370,22 @@ def fetch_blocks(args: argparse.Namespace) -> int:
             logger.debug("blocked='%s',reason='%s' - AFTER!", block["blocked"], block["reason"])
 
             if block["blocked"] in [None, ""]:
-                logger.warning("block[blocked]='%s' is empty, row[blocker]='%s'", block["blocked"], row["blocker"])
+                logger.warning("block[blocked]='%s' is empty, row[domain]='%s'", block["blocked"], row["domain"])
                 continue
             elif not domain_helper.is_tld_wanted(block["blocked"]):
                 logger.warning("block[blocked]='%s' has an unwanted TLD - SKIPPED!", block["blocked"])
                 continue
             elif block["blocked"].find("*") >= 0:
-                logger.debug("row[blocker]='%s' uses '*' for obfuscating domains", row["blocker"])
-                instances.set_has_obfuscation(row["blocker"], True)
+                logger.debug("row[domain]='%s' uses '*' for obfuscating domains", row["domain"])
+                instances.set_has_obfuscation(row["domain"], True)
                 obfuscated = obfuscated + 1
 
                 # Some friendica servers also obscure domains without hash
-                obfuscated = instances.deobfuscate("*", block["blocked"], block["digest"] if "digest" in block else None)
+                records = instances.deobfuscate("*", block["blocked"], block["digest"] if "digest" in block else None)
 
-                logger.debug("obfuscated[]='%s'", type(obfuscated))
-                if obfuscated is None:
-                    logger.warning("Cannot deobfuscate block[blocked]='%s',row[blocker]='%s',row[software]='%s' - SKIPPED!", block["blocked"], row["blocker"], row["software"])
+                logger.debug("records[]='%s'", type(records))
+                if records is None:
+                    logger.warning("Cannot deobfuscate block[blocked]='%s',row[domain]='%s',row[software]='%s' - SKIPPED!", block["blocked"], row["domain"], row["software"])
 
                     if not obfuscation.is_added(block["blocked"]):
                         logger.debug("Invoking obfuscation.add(%s) ...", block["blocked"])
@@ -397,20 +397,20 @@ def fetch_blocks(args: argparse.Namespace) -> int:
                     continue
 
                 deobfuscated = deobfuscated + 1
-                block["blocked"]    = obfuscated["domain"]
-                row["origin"]       = obfuscated["origin"]
-                row["nodeinfo_url"] = obfuscated["nodeinfo_url"]
+                block["blocked"]    = records["domain"]
+                row["origin"]       = records["origin"]
+                row["nodeinfo_url"] = records["nodeinfo_url"]
             elif block["blocked"].find("?") >= 0:
-                logger.debug("row[blocker]='%s' uses '?' for obfuscating domains", row["blocker"])
-                instances.set_has_obfuscation(row["blocker"], True)
+                logger.debug("row[domain]='%s' uses '?' for obfuscating domains", row["domain"])
+                instances.set_has_obfuscation(row["domain"], True)
                 obfuscated = obfuscated + 1
 
                 # Some obscure them with question marks, not sure if that's dependent on version or not
-                obfuscated = instances.deobfuscate("?", block["blocked"], block["digest"] if "digest" in block else None)
+                records = instances.deobfuscate("?", block["blocked"], block["digest"] if "digest" in block else None)
 
-                logger.debug("obfuscated[]='%s'", type(obfuscated))
-                if obfuscated is None:
-                    logger.warning("Cannot deobfuscate block[blocked]='%s',row[blocker]='%s',row[software]='%s' - SKIPPED!", block["blocked"], row["blocker"], row["software"])
+                logger.debug("records[]='%s'", type(records))
+                if records is None:
+                    logger.warning("Cannot deobfuscate block[blocked]='%s',row[domain]='%s',row[software]='%s' - SKIPPED!", block["blocked"], row["domain"], row["software"])
 
                     if not obfuscation.is_added(block["blocked"]):
                         logger.debug("Invoking obfuscation.add(%s) ...", block["blocked"])
@@ -422,9 +422,9 @@ def fetch_blocks(args: argparse.Namespace) -> int:
                     continue
 
                 deobfuscated = deobfuscated + 1
-                block["blocked"]    = obfuscated["domain"]
-                row["origin"]       = obfuscated["origin"]
-                row["nodeinfo_url"] = obfuscated["nodeinfo_url"]
+                block["blocked"]    = records["domain"]
+                row["origin"]       = records["origin"]
+                row["nodeinfo_url"] = records["nodeinfo_url"]
             elif not validators.domain(block["blocked"], rfc_2782=True):
                 logger.warning("block[blocked]='%s' is not a valid domain - SKIPPED!", block["blocked"])
                 continue
@@ -451,15 +451,15 @@ def fetch_blocks(args: argparse.Namespace) -> int:
                 logger.debug("block[blocked]='%s' is accepted, not wanted here - SKIPPED!", block["blocked"])
                 continue
             elif not instances.is_registered(block["blocked"]):
-                logger.info("Fetching new instance block[blocked]'%s' for row[blocker]='%s' ...", block["blocked"], row["blocker"])
-                federation.fetch_instances(block["blocked"], row["blocker"], None, inspect.currentframe().f_code.co_name)
+                logger.info("Fetching new instance block[blocked]'%s' for row[domain]='%s' ...", block["blocked"], row["domain"])
+                federation.fetch_instances(block["blocked"], row["domain"], None, inspect.currentframe().f_code.co_name)
 
             logger.debug("Invoking blocks_helper.alias_block_level('%s') ... - BEFORE!", block["block_level"])
             block["block_level"] = blocks_helper.alias_block_level(block["block_level"])
             logger.debug("block[block_level]='%s' - AFTER!", block["block_level"])
 
-            if processing.block(row["blocker"], block["blocked"], block["reason"], block["block_level"]) and block["block_level"] in ["rejected", "suspended"] and _bot_enabled:
-                logger.debug("Appending block[blocked]'%s',reason='%s' for row[blocker]='%s' ...", block["blocked"], block["block_level"], row["blocker"])
+            if processing.block(row["domain"], block["blocked"], block["reason"], block["block_level"]) and block["block_level"] in ["rejected", "suspended"] and _bot_enabled:
+                logger.debug("Appending block[blocked]'%s',reason='%s' for row[domain]='%s' ...", block["blocked"], block["block_level"], row["domain"])
                 blockdict.append({
                     "blocked": block["blocked"],
                     "reason" : block["reason"],
@@ -468,22 +468,22 @@ def fetch_blocks(args: argparse.Namespace) -> int:
             logger.debug("Invoking cookies.clear(%s) ...", block["blocked"])
             cookies.clear(block["blocked"])
 
-        logger.info("row[blocker]='%s' has %d obfuscated domain(s) and %d of them could be deobfuscated.", row["blocker"], obfuscated, deobfuscated)
-        instances.set_obfuscated_blocks(row["blocker"], obfuscated)
+        logger.info("row[domain]='%s' has %d obfuscated domain(s) and %d of them could be deobfuscated.", row["domain"], obfuscated, deobfuscated)
+        instances.set_obfuscated_blocks(row["domain"], obfuscated)
 
-        logger.debug("Flushing updates for row[blocker]='%s' ...", row["blocker"])
-        instances.update(row["blocker"])
+        logger.debug("Flushing updates for row[domain]='%s' ...", row["domain"])
+        instances.update(row["domain"])
 
         logger.debug("Invoking commit() ...")
         database.connection.commit()
 
-        logger.debug("Invoking cookies.clear(%s) ...", row["blocker"])
-        cookies.clear(row["blocker"])
+        logger.debug("Invoking cookies.clear(%s) ...", row["domain"])
+        cookies.clear(row["domain"])
 
         logger.debug("_bot_enabled='%s',blockdict()=%d'", _bot_enabled, len(blockdict))
         if _bot_enabled and len(blockdict) > 0:
-            logger.info("Sending bot POST for row[blocker]='%s',blockdict()=%d ...", row["blocker"], len(blockdict))
-            network.send_bot_post(row["blocker"], blockdict)
+            logger.info("Sending bot POST for row[domain]='%s',blockdict()=%d ...", row["domain"], len(blockdict))
+            network.send_bot_post(row["domain"], blockdict)
 
     logger.debug("Success! - EXIT!")
     return 0
index adc3055a98dd461d3f51157b47c51101fcdf71d9..dfa79beab356d2e6fe6e4ad0e1429ea152aec590 100644 (file)
@@ -34,14 +34,14 @@ logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
 # Language mapping X -> English
-language_mapping = {
+_language_mapping = {
     # English -> English
     "Silenced instances"            : "Silenced servers",
     "Suspended instances"           : "Suspended servers",
     "Suspended servers"             : "Suspended servers",
     "Limited instances"             : "Limited servers",
     "Filtered media"                : "Filtered media",
-    # Mappuing German -> English
+    # German -> English
     "Gesperrte Server"              : "Suspended servers",
     "Gefilterte Medien"             : "Filtered media",
     "Stummgeschaltete Server"       : "Silenced servers",
@@ -61,6 +61,12 @@ language_mapping = {
     "Serveurs modérés"              : "Limited servers",
 }
 
+# Paths to check
+_paths = [
+    "/about/more",
+    "/about"
+]
+
 def fetch_blocks_from_about(domain: str) -> dict:
     logger.debug("domain='%s' - CALLED!", domain)
     domain_helper.raise_on(domain)
@@ -73,8 +79,8 @@ def fetch_blocks_from_about(domain: str) -> dict:
     # Init variables
     doc = None
 
-    logger.info("Fetching mastodon blocks from domain='%s'", domain)
-    for path in ["/about/more", "/about"]:
+    logger.info("Fetching mastodon blocks from domain='%s',_paths()=%d ...", domain, len(_paths))
+    for path in _paths:
         logger.debug("path='%s'", path)
         try:
             logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
@@ -111,9 +117,9 @@ def fetch_blocks_from_about(domain: str) -> dict:
         header_text = tidyup.reason(header.text)
 
         logger.debug("header_text='%s'", header_text)
-        if header_text in language_mapping:
+        if header_text in _language_mapping:
             logger.debug("Translating header_text='%s' ...", header_text)
-            header_text = language_mapping[header_text]
+            header_text = _language_mapping[header_text]
         else:
             logger.warning("header_text='%s' not found in language mapping table", header_text)
 
@@ -141,7 +147,7 @@ def fetch_blocks_from_about(domain: str) -> dict:
                     logger.warning("domain='%s' is empty,line='%s' - SKIPPED!", domain, line)
                     continue
 
-                logger.debug("Appending domain='%s',digest='%s',reason='%s' to blocklist header_text='%s' ...", domain, digest, reason, blocklist)
+                logger.debug("Appending domain='%s',digest='%s',reason='%s' to blocklist header_text='%s' ...", domain, digest, reason, header_text)
                 blocklist[header_text].append({
                     "domain": domain,
                     "digest": digest,
@@ -181,6 +187,7 @@ def fetch_blocks(domain: str) -> list:
             logger.debug("block_level='%s',blocklists()=%d", block_level, len(blocklists))
             for block in blocklists:
                 # Check type
+                logger.debug("block[]='%s'", type(block))
                 if not isinstance(block, dict):
                     logger.debug("block[]='%s' is of type 'dict' - SKIPPED!", type(block))
                     continue