]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Sun, 15 Sep 2024 17:17:41 +0000 (19:17 +0200)
committerRoland Häder <roland@mxchange.org>
Sun, 15 Sep 2024 17:18:44 +0000 (19:18 +0200)
- introduced domain_helper.is_tld_wanted() which checks if the top-level domain
  of a domain name is wanted

fba/commands.py
fba/helpers/domain.py
fba/helpers/processing.py
fba/http/federation.py
fba/models/instances.py

index 539aee336415e5d9a50cfb9fb3e7c9f074683e5f..3c20a977e8462061f9f053cd225fcb9f18c3d525 100644 (file)
@@ -372,17 +372,8 @@ def fetch_blocks(args: argparse.Namespace) -> int:
             if block["blocked"] in [None, ""]:
                 logger.warning("block[blocked]='%s' is empty, blocker='%s'", block["blocked"], blocker)
                 continue
-            elif block["blocked"].endswith(".onion"):
-                logger.debug("block[blocked]='%s' is a TOR .onion domain - SKIPPED!", block["blocked"])
-                continue
-            elif block["blocked"].endswith(".i2p") and not config.get("allow_i2p_domain"):
-                logger.debug("block[blocked]='%s' is an I2P .onion domain - SKIPPED!", block["blocked"])
-                continue
-            elif block["blocked"].endswith(".arpa"):
-                logger.debug("block[blocked]='%s' is a reverse IP address - SKIPPED!", block["blocked"])
-                continue
-            elif block["blocked"].endswith(".tld"):
-                logger.debug("block[blocked]='%s' is a fake domain - SKIPPED!", block["blocked"])
+            elif not domain_helper.is_tld_wanted(block["blocked"]):
+                logger.warning("block[blocked]='%s' has an unwanted TLD - SKIPPED!", block["blocked"])
                 continue
             elif block["blocked"].find("*") >= 0:
                 logger.debug("blocker='%s' uses '*' for obfuscating domains", blocker)
@@ -439,17 +430,8 @@ def fetch_blocks(args: argparse.Namespace) -> int:
             elif not validators.domain(block["blocked"], rfc_2782=True):
                 logger.warning("block[blocked]='%s' is not a valid domain - SKIPPED!", block["blocked"])
                 continue
-            elif block["blocked"].endswith(".onion"):
-                logger.debug("block[blocked]'%s' is a TOR .onion domain - SKIPPED!", block["blocked"])
-                continue
-            elif block["blocked"].endswith(".i2p") and not config.get("allow_i2p_domain"):
-                logger.debug("block[blocked]'%s' is an I2P .onion domain - SKIPPED!", block["blocked"])
-                continue
-            elif block["blocked"].endswith(".arpa"):
-                logger.debug("block[blocked]'%s' is a reverse IP address - SKIPPED!", block["blocked"])
-                continue
-            elif block["blocked"].endswith(".tld"):
-                logger.debug("block[blocked]'%s' is a fake domain - SKIPPED!", block["blocked"])
+            elif not domain_helper.is_tld_wanted(block["blocked"]):
+                logger.debug("block[blocked]='%s' has an unwanted TLD - SKIPPED!", block["blocked"])
                 continue
 
             logger.debug("block[blocked]='%s' - BEFORE!", block["blocked"])
@@ -1425,17 +1407,8 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
             if block["blocked"] == "":
                 logger.debug("block[blocked] is empty - SKIPPED!")
                 continue
-            elif block["blocked"].endswith(".onion"):
-                logger.debug("block[blocked]='%s' is a TOR onion domain name - SKIPPED!", block["blocked"])
-                continue
-            elif block["blocked"].endswith(".i2p") and not config.get("allow_i2p_domain"):
-                logger.debug("block[blocked]='%s' is an I2P onion domain name - SKIPPED!", block["blocked"])
-                continue
-            elif block["blocked"].endswith(".arpa"):
-                logger.debug("block[blocked]='%s' is a reversed IP address - SKIPPED!", block["blocked"])
-                continue
-            elif block["blocked"].endswith(".tld"):
-                logger.debug("block[blocked]='%s' is a fake domain name - SKIPPED!", block["blocked"])
+            elif not domain_helper.is_tld_wanted(block["blocked"]):
+                logger.debug("block[blocked]='%s' has an unwanted TLD - SKIPPED!", block["blocked"])
                 continue
             elif block["blocked"].find("*") >= 0 or block["blocked"].find("?") >= 0:
                 logger.debug("block[blocked]='%s' is obfuscated.", block["blocked"])
@@ -1611,17 +1584,8 @@ def update_nodeinfo(args: argparse.Namespace) -> int:
     cnt = 0
     for row in domains:
         logger.debug("row[]='%s'", type(row))
-        if row["domain"].endswith(".i2p") and not config.get("allow_i2p_domain"):
-            logger.debug("row[domain]='%s' is an I2P address - SKIPPED!", row["domain"])
-            continue
-        elif row["domain"].endswith(".onion"):
-            logger.debug("row[domain]='%s' is a TOR .onion domain - SKIPPED!", row["domain"])
-            continue
-        elif row["domain"].endswith(".arpa"):
-            logger.debug("row[domain]='%s' is a reverse IP address - SKIPPED!", row["domain"])
-            continue
-        elif row["domain"].endswith(".tld"):
-            logger.debug("row[domain]='%s' is a fake domain - SKIPPED!", row["domain"])
+        if not domain_helper.is_tld_wanted(row["domain"]):
+            logger.debug("row[domain]='%s' has an unwanted TLD - SKIPPED!", row["domain"])
             continue
         elif blacklist.is_blacklisted(row["domain"]):
             logger.debug("row[domain]='%s' is blacklisted - SKIPPED!", row["domain"])
index 8b8bfd357d8ba40b136f23d95b4d17f74dc800fe..bf400644cce6658ae3c939eb614e0e2438d1f5f8 100644 (file)
@@ -79,6 +79,33 @@ def is_in_url(domain: str, url: str) -> bool:
     logger.debug("is_found='%s' - EXIT!", is_found)
     return is_found
 
+@lru_cache
+def is_tld_wanted(domain: str) -> bool:
+    logger.debug("domain='%s' - CALLED!", domain)
+
+    if not isinstance(domain, str):
+        raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
+    elif domain == "":
+        raise ValueError("Parameter 'domain' is empty")
+
+    wanted = True
+
+    if domain.endswith(".onion"):
+        logger.debug("domain='%s' is a TOR .onion domain - setting wanted=False ...", domain)
+        wanted = False
+    elif domain.endswith(".i2p") and not config.get("allow_i2p_domain"):
+        logger.debug("domain='%s' is an I2P .onion domain - setting wanted=False ...", domain)
+        wanted = False
+    elif domain.endswith(".arpa"):
+        logger.debug("domain='%s' is a reverse IP address - setting wanted=False ...", domain)
+        wanted = False
+    elif domain.endswith(".tld"):
+        logger.debug("domain='%s' is a fake domain - setting wanted=False ...", domain)
+        wanted = False
+
+    logger.debug("wanted='%s' - EXIT!", wanted)
+    return wanted
+
 @lru_cache
 def is_wanted(domain: str) -> bool:
     logger.debug("domain='%s' - CALLED!", domain)
@@ -95,17 +122,8 @@ def is_wanted(domain: str) -> bool:
     elif not validators.domain(domain.split("/")[0], rfc_2782=True):
         logger.debug("domain='%s' is not a valid domain name - setting False ...", domain)
         wanted = False
-    elif domain.endswith(".arpa"):
-        logger.debug("domain='%s' is a domain for reversed IP addresses - setting False ...", domain)
-        wanted = False
-    elif domain.endswith(".onion"):
-        logger.debug("domain='%s' is a TOR .onion domain - setting False ...", domain)
-        wanted = False
-    elif domain.endswith(".i2p") and not config.get("allow_i2p_domain"):
-        logger.debug("domain='%s' is an I2P domain - setting False ...", domain)
-        wanted = False
-    elif domain.endswith(".tld"):
-        logger.debug("domain='%s' is a fake domain - setting False ...", domain)
+    elif not is_tld_wanted(domain):
+        logger.debug("domain='%s' has an unwanted TLD - setting False ...", domain)
         wanted = False
     elif blacklist.is_blacklisted(domain):
         logger.debug("domain='%s' is blacklisted - setting False ...", domain)
index 1999b2b78eb9e752688ca708b1777cf86aa68389..5631f3a64168f95baab74bec7a2e3668a90b2c10 100644 (file)
@@ -208,17 +208,8 @@ def csv_block(blocker: str, url: str, command: str) -> None:
         if domain in [None, ""]:
             logger.debug("domain='%s' is empty - SKIPPED!", domain)
             continue
-        elif domain.endswith(".onion"):
-            logger.debug("domain='%s' is a TOR .onion domain - SKIPPED!", domain)
-            continue
-        elif domain.endswith(".i2p") and not config.get("allow_i2p_domain"):
-            logger.debug("domain='%s' is an I2P .onion domain - SKIPPED!", domain)
-            continue
-        elif domain.endswith(".arpa"):
-            logger.debug("domain='%s' is a reverse IP address - SKIPPED!", domain)
-            continue
-        elif domain.endswith(".tld"):
-            logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
+        elif not domain_helper.is_tld_wanted(domain):
+            logger.debug("domain='%s' has an unwanted TLD - SKIPPED!", domain)
             continue
         elif domain.find("*") >= 0 or domain.find("?") >= 0:
             logger.debug("domain='%s' is obfuscated - Invoking utils.deobfuscate(%s, %s) ...", domain, domain, blocker)
index 241bdd801cebb38f759e7a2aea2f47091e64d4e7..544796e32c3243e070b837133a88e972bb861cc4 100644 (file)
@@ -176,17 +176,8 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path:
         if not validators.domain(probe, rfc_2782=True):
             logger.warning("probe='%s' is not a valid domain - SKIPPED!", probe)
             continue
-        elif probe.endswith(".onion"):
-            logger.debug("probe='%s' is a TOR .onion probe - SKIPPED!", probe)
-            continue
-        elif probe.endswith(".i2p") and not config.get("allow_i2p_domain"):
-            logger.debug("probe='%s' is an I2P .onion probe - SKIPPED!", probe)
-            continue
-        elif probe.endswith(".arpa"):
-            logger.debug("probe='%s' is a reverse IP address - SKIPPED!", probe)
-            continue
-        elif probe.endswith(".tld"):
-            logger.debug("probe='%s' is a fake probe - SKIPPED!", probe)
+        elif not domain_helper.is_tld_wanted(probe):
+            logger.debug("probe='%s' has an unwanted TLD - SKIPPED!", probe)
             continue
 
         logger.debug("instance='%s' - BEFORE!", instance)
@@ -544,8 +535,9 @@ def find_domains(tag: bs4.element.Tag, domainColumn: str = "dt", reasonColumn: s
         logger.debug("element[%s]='%s'", type(element), element)
         domain = tidyup.domain(element.text)
         reasons = element.find_next(reasonColumn).text.split(reasonText)[1].splitlines()
+
         logger.debug("reasons(%d)='%s'", len(reasons), reasons)
-        reason = None
+        reason = ""
         for r in reasons:
             logger.debug("r[%s]='%s'", type(r), r)
             if r != "":
index 45d2fb6d238251c38ac4f3f803508a9a7dc7b2d1..5ea702fc8f7ef557764ab752ef073a0d5e0908fb 100644 (file)
@@ -608,17 +608,8 @@ def translate_idnas(rows: list, column: str) -> None:
         elif not validators.domain(row[column].split("/")[0], rfc_2782=True):
             logger.warning("row[%s]='%s' is not valid domain - SKIPPED!", column, row[column])
             continue
-        elif row[column].endswith(".onion"):
-            logger.debug("row[%s]='%s' is a TOR .onion domain - SKIPPED!", column, row[column])
-            continue
-        elif row[column].endswith(".i2p") and not config.get("allow_i2p_domain"):
-            logger.debug("row[%s]='%s' is an I2P .onion domain - SKIPPED!", column, row[column])
-            continue
-        elif row[column].endswith(".arpa"):
-            logger.debug("row[%s]='%s' is a reverse IP address - SKIPPED!", column, row[column])
-            continue
-        elif row[column].endswith(".tld"):
-            logger.debug("row[%s]='%s' is a fake domain - SKIPPED!", column, row[column])
+        elif not domain_helper.is_tld_wanted(row[column]):
+            logger.debug("row[%s]='%s' has an unwanted TLD - SKIPPED!", column, row[column])
             continue
 
         punycode = domain_helper.encode_idna(row[column])