]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Tue, 8 Aug 2023 19:48:58 +0000 (21:48 +0200)
committerRoland Häder <roland@mxchange.org>
Tue, 8 Aug 2023 19:48:58 +0000 (21:48 +0200)
- fba.commands.fetch_blocks() does the last checkup if the domain is valid et
  cetera, including deobfuscation

fba/commands.py
fba/models/blocks.py
fba/networks/friendica.py
fba/networks/lemmy.py
fba/networks/mastodon.py
fba/networks/misskey.py
fba/networks/pleroma.py

index 475de877fa216121ab5515bf1ce4c3398943b049..fb363fe6ea528cc506024621360683003ccbf62f 100644 (file)
@@ -306,51 +306,49 @@ def fetch_blocks(args: argparse.Namespace) -> int:
     logger.info("Checking %d entries ...", len(rows))
     for blocker, software, origin, nodeinfo_url in rows:
         logger.debug("blocker='%s',software='%s',origin='%s',nodeinfo_url='%s'", blocker, software, origin, nodeinfo_url)
-        blocker = tidyup.domain(blocker)
-        logger.debug("blocker='%s' - AFTER!", blocker)
 
-        if blocker == "":
-            logger.warning("blocker is now empty!")
-            continue
-        elif nodeinfo_url is None or nodeinfo_url == "":
-            logger.debug("blocker='%s',software='%s' has empty nodeinfo_url", blocker, software)
+        if nodeinfo_url is None:
+            logger.debug("blocker='%s',software='%s' has no nodeinfo_url set - SKIPPED!", blocker, software)
             continue
         elif not domain_helper.is_wanted(blocker):
-            logger.debug("blocker='%s' is not wanted - SKIPPED!", blocker)
+            logger.warning("blocker='%s' is not wanted - SKIPPED!", blocker)
             continue
 
-        logger.debug("blocker='%s'", blocker)
+        logger.debug("Setting last_blocked,has_obfuscation=false for blocker='%s' ...", blocker)
         instances.set_last_blocked(blocker)
         instances.set_has_obfuscation(blocker, False)
 
         blocking = list()
-        if software == "pleroma":
-            logger.info("blocker='%s',software='%s'", blocker, software)
-            blocking = pleroma.fetch_blocks(blocker, nodeinfo_url)
-            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
-        elif software == "mastodon":
-            logger.info("blocker='%s',software='%s'", blocker, software)
-            blocking = mastodon.fetch_blocks(blocker, nodeinfo_url)
-            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
-        elif software == "lemmy":
-            logger.info("blocker='%s',software='%s'", blocker, software)
-            blocking = lemmy.fetch_blocks(blocker, nodeinfo_url)
-            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
-        elif software == "friendica":
-            logger.info("blocker='%s',software='%s'", blocker, software)
-            blocking = friendica.fetch_blocks(blocker)
-            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
-        elif software == "misskey":
-            logger.info("blocker='%s',software='%s'", blocker, software)
-            blocking = misskey.fetch_blocks(blocker)
-            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
-        else:
-            logger.warning("Unknown software: blocker='%s',software='%s'", blocker, software)
 
-        logger.debug("blocker='%s'", blocker)
         if blocker != "chaos.social":
+            logger.debug("blocker='%s',software='%s'", blocker, software)
+            if software == "pleroma":
+                logger.info("blocker='%s',software='%s'", blocker, software)
+                blocking = pleroma.fetch_blocks(blocker, nodeinfo_url)
+                logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
+            elif software == "mastodon":
+                logger.info("blocker='%s',software='%s'", blocker, software)
+                blocking = mastodon.fetch_blocks(blocker, nodeinfo_url)
+                logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
+            elif software == "lemmy":
+                logger.info("blocker='%s',software='%s'", blocker, software)
+                blocking = lemmy.fetch_blocks(blocker, nodeinfo_url)
+                logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
+            elif software == "friendica":
+                logger.info("blocker='%s',software='%s'", blocker, software)
+                blocking = friendica.fetch_blocks(blocker)
+                logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
+            elif software == "misskey":
+                logger.info("blocker='%s',software='%s'", blocker, software)
+                blocking = misskey.fetch_blocks(blocker)
+                logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
+            else:
+                logger.warning("Unknown software: blocker='%s',software='%s'", blocker, software)
+
             logger.debug("Invoking instances.set_total_blocks(%s, %d) ...", blocker, len(blocking))
             instances.set_total_blocks(blocker, blocking)
+        else:
+            logger.debug("Skipping chaos.social, run ./fba.py fetch_cs instead!")
 
         logger.info("Checking %d entries from blocker='%s',software='%s' ...", len(blocking), blocker, software)
         blockdict = list()
index 943db348cd6fd7c9471ed93b22a6ff14d078125d..0e445ea9581abea7402a9fd0eb3f89936206922f 100644 (file)
@@ -230,8 +230,9 @@ def alias_block_level(block_level: str) -> str:
         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
     elif block_level == "":
         raise ValueError("Parameter 'block_level' is empty")
-
-    if block_level == "silence":
+    elif block_level in ["accept", "accepted"]:
+        raise ValueError(f"Parameter block_level='{block_level}' is not accepted but function was invoked")
+    elif block_level == "silence":
         logger.debug("Block level 'silence' has been changed to 'silenced'")
         block_level = "silenced"
     elif block_level == "suspend":
index b1d34fc14d627c59da878ff48283ba459377896c..8e968c21588abf866c231bbcc59d0dfb681c0b99 100644 (file)
@@ -84,17 +84,6 @@ def fetch_blocks(domain: str) -> list:
         if blocked == "":
             logger.debug("line[]='%s' returned empty blocked domain - SKIPPED!", type(line))
             continue
-        elif not domain_helper.is_wanted(blocked):
-            logger.debug("blocked='%s' is not wanted - SKIPPED!", domain)
-            continue
-
-        logger.debug("blocked='%s',domain='%s' - BEFORE!", blocked, domain)
-        blocked = utils.deobfuscate(blocked, domain)
-
-        logger.debug("blocked[%s]='%s' - DEOBFUSCATED!", type(blocked), blocked)
-        if not domain_helper.is_wanted(blocked):
-            logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
-            continue
 
         logger.debug("Appending blocked='%s',reason='%s'", blocked, reason)
         blocklist.append({
index 7b427fc91ca05db6a85238be3015bb2b76b41294..6a3ed10bac94969e40ca82a6edfa4a190bd6b6cb 100644 (file)
@@ -209,9 +209,6 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                 if blocked == "":
                     logger.warning("blocked='%s' is empty after tidyup.domain() - SKIPPED!", tag.contents[0])
                     continue
-                elif not domain_helper.is_wanted(blocked):
-                    logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
-                    continue
 
                 logger.debug("Appending blocker='%s',blocked='%s',block_level='reject' ...", domain, blocked)
                 blocklist.append({
@@ -270,9 +267,6 @@ def fetch_instances(domain: str, origin: str) -> list:
                         if peer == "":
                             logger.debug("peer is empty - SKIPPED!")
                             continue
-                        elif not domain_helper.is_wanted(peer):
-                            logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
-                            continue
                         elif peer in peers:
                             logger.debug("peer='%s' already added - SKIPPED!", peer)
                             continue
@@ -366,9 +360,6 @@ def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
                 if peer == "":
                     logger.debug("peer is empty - SKIPPED!")
                     continue
-                elif not domain_helper.is_wanted(peer):
-                    logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
-                    continue
                 elif peer in peers:
                     logger.debug("peer='%s' already added - SKIPPED!", peer)
                     continue
index 368117661d7042e0ce181d89f1ec5c1741c4cb24..c1eb6b779d59a696438effb05c5a2a461aaa99b4 100644 (file)
@@ -26,6 +26,7 @@ from fba.helpers import tidyup
 
 from fba.http import network
 
+from fba.models import blocks
 from fba.models import instances
 
 logging.basicConfig(level=logging.INFO)
@@ -198,6 +199,9 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                 elif "severity" not in block:
                     logger.warning("block()=%d does not contain element 'severity' - SKIPPED!", len(block))
                     continue
+                elif block["severity"] in ["accept", "accepted"]:
+                    logger.debug("block[domain]='%s' has unwanted severity level '%s' - SKIPPED!", block["domain"], block["severity"])
+                    continue
 
                 reason = tidyup.reason(block["comment"]) if "comment" in block and block["comment"] is not None and block["comment"] != "" else None
 
@@ -207,7 +211,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                     "blocked"    : block["domain"],
                     "hash"       : block["digest"],
                     "reason"     : reason,
-                    "block_level": block["severity"]
+                    "block_level": blocks.alias_block_level(block["severity"]),
                 })
         else:
             logger.debug("domain='%s' has no block list", domain)
index 4227b2c9729e89478c9aa43e57b83cad2965f0a8..7583a9e339a433cd7b991bea07ca0f41c09eb644 100644 (file)
@@ -106,9 +106,6 @@ def fetch_peers(domain: str) -> list:
             elif not isinstance(row["host"], str):
                 logger.warning("row[host][]='%s' is not of type 'str' - SKIPPED!", type(row['host']))
                 continue
-            elif not domain_helper.is_wanted(row["host"]):
-                logger.debug("row[host]='%s' is not wanted, domain='%s' - SKIPPED!", row['host'], domain)
-                continue
             elif row["host"] in peers:
                 logger.debug("Not adding row[host]='%s', already found - SKIPPED!", row['host'])
                 continue
index 5a52f9cd58e823cdb37497d01489a13d8888dad2..fceebf53e9380088c4e68275a2e0fa88ebecc7af 100644 (file)
@@ -130,17 +130,11 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                     if blocked == "":
                         logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s' - SKIPPED!", domain, block_level)
                         continue
-                    elif not domain_helper.is_wanted(blocked):
-                        logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
-                        continue
 
                     logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
                     blocked = utils.deobfuscate(blocked, domain)
 
                     logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
-                    if not domain_helper.is_wanted(blocked):
-                        logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
-                        continue
 
                     logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
                     blockdict.append({
@@ -163,17 +157,6 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
             if blocked == "":
                 logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
                 continue
-            elif not domain_helper.is_wanted(blocked):
-                logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
-                continue
-
-            logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
-            blocked = utils.deobfuscate(blocked, domain)
-
-            logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
-            if not domain_helper.is_wanted(blocked):
-                logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
-                continue
 
             logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
             blockdict.append({
@@ -232,13 +215,6 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                 if blocked == "":
                     logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
                     continue
-                elif not domain_helper.is_wanted(blocked):
-                    logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
-                    continue
-
-                logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
-                blocked = utils.deobfuscate(blocked, domain)
-                logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
 
                 logger.debug("Checking %d blockdict records ...", len(blockdict))
                 for block in blockdict:
@@ -266,17 +242,6 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
             elif blocked == "":
                 logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
                 continue
-            elif not domain_helper.is_wanted(blocked):
-                logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
-                continue
-
-            logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
-            blocked = utils.deobfuscate(blocked, domain)
-
-            logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
-            if not domain_helper.is_wanted(blocked):
-                logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
-                continue
 
             logger.debug("Checking %d blockdict records ...", len(blockdict))
             for block in blockdict:
@@ -300,14 +265,6 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
 
                 logger.debug("rows[%s]()=%d'", type(rows), len(rows))
                 for block in rows:
-                    logger.debug("Invoking utils.deobfuscate(%s, %s) ...", block["blocked"], domain)
-                    block["blocked"] = utils.deobfuscate(block["blocked"], domain)
-
-                    logger.debug("block[blocked]='%s' - DEOBFUSCATED!", block["blocked"])
-                    if not domain_helper.is_wanted(block["blocked"]):
-                        logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"])
-                        continue
-
                     logger.debug("Appending blocker='%s',block[blocked]='%s',block[reason]='%s',block_level='%s' ...",domain, block["blocked"], block["reason"], block_level)
                     blockdict.append({
                         "blocker"    : domain,
@@ -402,6 +359,10 @@ def fetch_blocks_from_about(domain: str) -> dict:
                 blocked = tidyup.domain(line.find_all("td")[0].text)
                 reason = tidyup.reason(line.find_all("td")[1].text)
 
+                if blocked is None or blocked == "":
+                    logger.debug("blocker='%s',block_level='%s': blocked is empty - SKIPPED!", blocker, block_level)
+                    continue
+
                 logger.debug("Appending block_level='%s',blocked='%s',reason='%s' ...", block_level, blocked, reason)
                 blocklist[block_level].append({
                     "blocked": blocked,