]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Mon, 10 Jul 2023 22:51:49 +0000 (00:51 +0200)
committerRoland Häder <roland@mxchange.org>
Mon, 10 Jul 2023 22:51:49 +0000 (00:51 +0200)
- added command convert_idna to convert UTF-8 encoded international domain
  names to punycode domains (IDNA), it caused some to be added in both
  encodings

fba/boot.py
fba/commands.py
fba/helpers/domain.py
fba/http/federation.py
fba/models/blocks.py
fba/models/instances.py
fba/utils.py

index ebde9f28c09a102418a54a255649463f69c5a0d3..15f29a10f87409ecd50c6078213d31b42ac5f873 100644 (file)
@@ -198,6 +198,13 @@ def init_parser():
     )
     parser.set_defaults(command=commands.fetch_instances_social)
 
+    ### Convert international domain names to punycode domains ###
+    parser = subparser_command.add_parser(
+        "convert_idna",
+        help="Convertes UTF-8 encoded international domain names into punycode (IDNA) domain names.",
+    )
+    parser.set_defaults(command=commands.convert_idna)
+
     logger.debug("EXIT!")
 
 def run_command():
index f80c89a09796d384e081b55b73bb661162963210..bd3d3938abaa381a12042645453c4d02a84797f6 100644 (file)
@@ -146,7 +146,12 @@ def fetch_pixelfed_api(args: argparse.Namespace) -> int:
             elif row["domain"] == "":
                 logger.debug("row[domain] is empty - SKIPPED!")
                 continue
-            elif not utils.is_domain_wanted(row["domain"]):
+
+            logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
+            row["domain"] = row["domain"].encode("idna").decode("utf-8")
+            logger.debug("row[domain]='%s' - AFTER!", row["domain"])
+
+            if not utils.is_domain_wanted(row["domain"]):
                 logger.warning("row[domain]='%s' is not wanted - SKIPPED!", row["domain"])
                 continue
             elif instances.is_registered(row["domain"]):
@@ -238,6 +243,10 @@ def fetch_bkali(args: argparse.Namespace) -> int:
     if len(domains) > 0:
         logger.info("Adding %d new instances ...", len(domains))
         for domain in domains:
+            logger.debug("domain='%s' - BEFORE!", domain)
+            domain = domain.encode("idna").decode("utf-8")
+            logger.debug("domain='%s' - AFTER!", domain)
+
             try:
                 logger.info("Fetching instances from domain='%s' ...", domain)
                 federation.fetch_instances(domain, 'tak.teleyal.blog', None, inspect.currentframe().f_code.co_name)
@@ -354,6 +363,8 @@ def fetch_blocks(args: argparse.Namespace) -> int:
             elif block["blocked"].endswith(".tld"):
                 logger.debug("blocked='%s' is a fake domain - SKIPPED", block["blocked"])
                 continue
+            elif "xn--" in block["blocked"]:
+                raise ValueError(f"blocked='{block['blocked']}' is a punycode domain, please don't crawl them!")
             elif block["blocked"].find("*") >= 0:
                 logger.debug("blocker='%s' uses obfuscated domains", blocker)
 
@@ -389,7 +400,12 @@ def fetch_blocks(args: argparse.Namespace) -> int:
             if block["blocked"] == "":
                 logger.debug("block[blocked] is empty - SKIPPED!")
                 continue
-            elif not utils.is_domain_wanted(block["blocked"]):
+
+            logger.debug("block[blocked]='%s' - BEFORE!", block["blocked"])
+            block["blocked"] = block["blocked"].encode("idna").decode("utf-8")
+            logger.debug("block[blocked]='%s' - AFTER!", block["blocked"])
+
+            if not utils.is_domain_wanted(block["blocked"]):
                 logger.warning("blocked='%s' is not wanted - SKIPPED!", block["blocked"])
                 continue
             elif block["block_level"] in ["accept", "accepted"]:
@@ -501,12 +517,17 @@ def fetch_observer(args: argparse.Namespace) -> int:
         for item in items:
             logger.debug("item[]='%s'", type(item))
             domain = item.decode_contents()
-
             logger.debug("domain='%s' - AFTER!", domain)
+
             if domain == "":
                 logger.debug("domain is empty - SKIPPED!")
                 continue
-            elif not utils.is_domain_wanted(domain):
+
+            logger.debug("domain='%s' - BEFORE!", domain)
+            domain = domain.encode("idna").decode("utf-8")
+            logger.debug("domain='%s' - AFTER!", domain)
+
+            if not utils.is_domain_wanted(domain):
                 logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
                 continue
             elif instances.is_registered(domain):
@@ -673,7 +694,10 @@ def fetch_cs(args: argparse.Namespace):
 
             for row in blocklist[block_level]:
                 logger.debug("row[%s]='%s'", type(row), row)
-                if instances.is_recent(row["domain"], "last_blocked"):
+                if not "domain" in row:
+                    logger.warning("row[]='%s' has no element 'domain' - SKIPPED!", type(row))
+                    continue
+                elif instances.is_recent(row["domain"], "last_blocked"):
                     logger.debug("row[domain]='%s' has been recently crawled - SKIPPED!", row["domain"])
                     continue
                 elif not instances.is_registered(row["domain"]):
@@ -741,7 +765,12 @@ def fetch_fba_rss(args: argparse.Namespace) -> int:
             if domain == "":
                 logger.debug("domain is empty - SKIPPED!")
                 continue
-            elif not utils.is_domain_wanted(domain):
+
+            logger.debug("domain='%s' - BEFORE!", domain)
+            domain = domain.encode("idna").decode("utf-8")
+            logger.debug("domain='%s' - AFTER!", domain)
+
+            if not utils.is_domain_wanted(domain):
                 logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
                 continue
             elif domain in domains:
@@ -761,6 +790,7 @@ def fetch_fba_rss(args: argparse.Namespace) -> int:
     if len(domains) > 0:
         logger.info("Adding %d new instances ...", len(domains))
         for domain in domains:
+            logger.debug("domain='%s'", domain)
             try:
                 logger.info("Fetching instances from domain='%s' ...", domain)
                 federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
@@ -813,7 +843,12 @@ def fetch_fbabot_atom(args: argparse.Namespace) -> int:
                     if domain == "":
                         logger.debug("domain is empty - SKIPPED!")
                         continue
-                    elif not utils.is_domain_wanted(domain):
+
+                    logger.debug("domain='%s' - BEFORE!", domain)
+                    domain = domain.encode("idna").decode("utf-8")
+                    logger.debug("domain='%s' - AFTER!", domain)
+
+                    if not utils.is_domain_wanted(domain):
                         logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
                         continue
                     elif domain in domains:
@@ -885,7 +920,12 @@ def fetch_instances(args: argparse.Namespace) -> int:
         if row["domain"] == "":
             logger.debug("row[domain] is empty - SKIPPED!")
             continue
-        elif not utils.is_domain_wanted(row["domain"]):
+
+        logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
+        row["domain"] = row["domain"].encode("idna").decode("utf-8")
+        logger.debug("row[domain]='%s' - AFTER!", row["domain"])
+
+        if not utils.is_domain_wanted(row["domain"]):
             logger.warning("Domain row[domain]='%s' is not wanted - SKIPPED!", row["domain"])
             continue
 
@@ -1035,6 +1075,8 @@ def fetch_oliphant(args: argparse.Namespace) -> int:
             elif domain.endswith(".tld"):
                 logger.debug("domain='%s' is a fake domain - SKIPPED", domain)
                 continue
+            elif "xn--" in domain:
+                raise ValueError(f"domain='{domain}' is a punycode domain, please translate them back!")
             elif domain.find("*") >= 0 or domain.find("?") >= 0:
                 logger.debug("domain='%s' is obfuscated - Invoking utils.deobfuscate(%s, %s) ...", domain, domain, block["blocker"])
                 domain = utils.deobfuscate(domain, block["blocker"])
@@ -1171,7 +1213,12 @@ def fetch_fedipact(args: argparse.Namespace) -> int:
             if domain == "":
                 logger.debug("domain is empty - SKIPPED!")
                 continue
-            elif not utils.is_domain_wanted(domain):
+
+            logger.debug("domain='%s' - BEFORE!", domain)
+            domain = domain.encode("idna").decode("utf-8")
+            logger.debug("domain='%s' - AFTER!", domain)
+
+            if not utils.is_domain_wanted(domain):
                 logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
                 continue
             elif instances.is_registered(domain):
@@ -1420,6 +1467,8 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
             elif block["blocked"].endswith(".tld"):
                 logger.debug("blocked='%s' is a fake domain name - SKIPPED!", block["blocked"])
                 continue
+            elif "xn--" in block["blocked"]:
+                raise ValueError(f"blocked='{block['blocked']}' is a punycode domain, please translate them back!")
             elif block["blocked"].endswith(".onion"):
                 logger.debug("blocked='%s' is a TOR onion domain name - SKIPPED!", block["blocked"])
                 continue
@@ -1516,7 +1565,12 @@ def fetch_fedilist(args: argparse.Namespace) -> int:
         if domain == "":
             logger.debug("domain is empty after tidyup: row[hostname]='%s' - SKIPPED!", row["hostname"])
             continue
-        elif not utils.is_domain_wanted(domain):
+
+        logger.debug("domain='%s' - BEFORE!", domain)
+        domain = domain.encode("idna").decode("utf-8")
+        logger.debug("domain='%s' - AFTER!", domain)
+
+        if not utils.is_domain_wanted(domain):
             logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
             continue
         elif (args.all is None or not args.all) and instances.is_registered(domain):
@@ -1625,12 +1679,17 @@ def fetch_instances_social(args: argparse.Namespace) -> int:
     for row in rows:
         logger.debug("row[]='%s'", type(row))
         domain = tidyup.domain(row["name"])
-
         logger.debug("domain='%s' - AFTER!", domain)
+
         if domain == "":
             logger.debug("domain is empty - SKIPPED!")
             continue
-        elif not utils.is_domain_wanted(domain):
+
+        logger.debug("domain='%s' - BEFORE!", domain)
+        domain = domain.encode("idna").decode("utf-8")
+        logger.debug("domain='%s' - AFTER!", domain)
+
+        if not utils.is_domain_wanted(domain):
             logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
             continue
         elif domain in domains:
@@ -1648,3 +1707,33 @@ def fetch_instances_social(args: argparse.Namespace) -> int:
 
     logger.debug("Success! - EXIT!")
     return 0
+
+def convert_idna(args: argparse.Namespace) -> int:
+    logger.debug("args[]='%s' - CALLED!", type(args))
+
+    database.cursor.execute("SELECT domain FROM instances WHERE domain NOT LIKE '%xn--%' ORDER BY domain ASC")
+    rows = database.cursor.fetchall()
+
+    logger.debug("rows[]='%s'", type(rows))
+    instances.translate_idnas(rows, "domain")
+
+    database.cursor.execute("SELECT origin FROM instances WHERE origin NOT LIKE '%xn--%' ORDER BY origin ASC")
+    rows = database.cursor.fetchall()
+
+    logger.debug("rows[]='%s'", type(rows))
+    instances.translate_idnas(rows, "origin")
+
+    database.cursor.execute("SELECT blocker FROM blocks WHERE blocker NOT LIKE '%xn--%' ORDER BY blocker ASC")
+    rows = database.cursor.fetchall()
+
+    logger.debug("rows[]='%s'", type(rows))
+    blocks.translate_idnas(rows, "blocker")
+
+    database.cursor.execute("SELECT blocked FROM blocks WHERE blocked NOT LIKE '%xn--%' ORDER BY blocked ASC")
+    rows = database.cursor.fetchall()
+
+    logger.debug("rows[]='%s'", type(rows))
+    blocks.translate_idnas(rows, "blocked")
+
+    logger.debug("Success! - EXIT!")
+    return 0
index 5328e3f37eeddc8a25043b390249704a7486de82..75f97a5d3020b7de9ea406ba7f184c328b84bb36 100644 (file)
@@ -39,6 +39,8 @@ def raise_on(domain: str):
         raise ValueError(f"domain='{domain}' is a TOR, please don't crawl them!")
     elif domain.endswith(".tld"):
         raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    elif "xn--" in domain:
+        raise ValueError(f"domain='{domain}' is a punycode domain, please don't crawl them!")
 
     logger.debug("EXIT!")
 
index 8cc59053a85971656c8a363e5d2fb3803b1af7fa..1418755821c1b761eea59a6790e0c8db9e038331 100644 (file)
@@ -111,7 +111,12 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path:
         if instance == "":
             logger.warning("Empty instance after tidyup.domain(), domain='%s'", domain)
             continue
-        elif not utils.is_domain_wanted(instance):
+
+        logger.debug("instance='%s' - BEFORE!", instance)
+        instance = instance.encode("idna").decode("utf-8")
+        logger.debug("instance='%s' - AFTER!", instance)
+
+        if not utils.is_domain_wanted(instance):
             logger.debug("instance='%s' is not wanted - SKIPPED!", instance)
             continue
         elif instance.find("/profile/") > 0 or instance.find("/users/") > 0 or (instances.is_registered(instance.split("/")[0]) and instance.find("/c/") > 0):
index 6b8edefbc127c2780e16aff82b1e50472cb526ea..cad0f6cbe8b89aa3d327da884ebdbc3959491ddc 100644 (file)
@@ -194,3 +194,32 @@ def valid(value: str, column: str) -> bool:
 
     logger.debug("valid='%s' - EXIT!", valid)
     return valid
+
+def translate_idnas(rows: list, column: str):
+    logger.debug("rows[]='%s' - CALLED!", type(rows))
+    if not isinstance(rows, list):
+        raise ValueError(f"rows[]='%s' is not of type 'list'", type(rows))
+    elif len(rows) == 0:
+        raise ValueError("Parameter 'rows' is an empty list")
+    elif not isinstance(column, str):
+        raise ValueError(f"column='%s' is not of type 'str'", type(column))
+    elif column == "":
+        raise ValueError("Parameter 'column' is empty")
+    elif column not in ["blocker", "blocked"]:
+        raise ValueError(f"column='{column}' is not supported")
+
+    logger.info("Checking/converting %d domain names ...", len(rows))
+    for row in rows:
+        logger.debug("row[]='%s'", type(row))
+
+        translated = row[column].encode("idna").decode("utf-8")
+        logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column])
+
+        if translated != row[column]:
+            logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated)
+            database.cursor.execute(f"UPDATE blocks SET {column} = ? WHERE {column} = ?", [translated, row[column]])
+
+            logger.debug("Invoking commit() ...")
+            database.connection.commit()
+
+    logger.debug("EXIT!")
index 7968ff236007aa4d086266dca30a4b001cd5ed06..1b8450499fbb7594c18ee738fef5fe2279751d68 100644 (file)
@@ -270,9 +270,13 @@ def set_success(domain: str):
 
     logger.debug("EXIT!")
 
-def is_registered(domain: str) -> bool:
-    logger.debug("domain='%s' - CALLED!", domain)
-    domain_helper.raise_on(domain)
+def is_registered(domain: str, skip_raise = False) -> bool:
+    logger.debug("domain='%s',skip_raise='%s' - CALLED!", domain, skip_raise)
+    if not isinstance(skip_raise, bool):
+        raise ValueError(f"skip_raise[]='%s' is not type of 'bool'", type(skip_raise))
+
+    if not skip_raise:
+        domain_helper.raise_on(domain)
 
     logger.debug("domain='%s' - CALLED!", domain)
     if not cache.key_exists("is_registered"):
@@ -474,3 +478,36 @@ def valid(value: str, column: str) -> bool:
 
     logger.debug("valid='%s' - EXIT!", valid)
     return valid
+
+def translate_idnas(rows: list, column: str):
+    logger.debug("rows[]='%s' - CALLED!", type(rows))
+    if not isinstance(rows, list):
+        raise ValueError(f"rows[]='%s' is not of type 'list'", type(rows))
+    elif len(rows) == 0:
+        raise ValueError("Parameter 'rows' is an empty list")
+    elif not isinstance(column, str):
+        raise ValueError(f"column='%s' is not of type 'str'", type(column))
+    elif column == "":
+        raise ValueError("Parameter 'column' is empty")
+    elif column not in ["domain", "origin"]:
+        raise ValueError(f"column='{column}' is not supported")
+
+    logger.info("Checking/converting %d domain names ...", len(rows))
+    for row in rows:
+        logger.debug("row[]='%s'", type(row))
+
+        translated = row[column].encode("idna").decode("utf-8")
+        logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column])
+
+        if translated != row[column]:
+            logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated)
+            if is_registered(translated, True):
+                logger.warning("Deleting row[%s]='%s' as translated='%s' already exist", column, row[column], translated)
+                database.cursor.execute(f"DELETE FROM instances WHERE {column} = ? LIMIT 1", [row[column]])
+            else:
+                database.cursor.execute(f"UPDATE instances SET {column} = ? WHERE {column} = ? LIMIT 1", [translated, row[column]])
+
+            logger.debug("Invoking commit() ...")
+            database.connection.commit()
+
+    logger.debug("EXIT!")
index 727af2bc81e4c576d661f9e40f875387ff8db4ab..019938c4f6c89d14be0c3a91f29b33ec89b9a652 100644 (file)
@@ -127,11 +127,20 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
     for tag in tags:
         logger.debug("tag[]='%s'", type(tag))
         domain = tidyup.domain(tag.find(search).contents[0])
+        logger.debug("domain='%s' - AFTER!", domain)
 
-        logger.debug("domain='%s'", domain)
         if domain == "":
             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
             domain = tidyup.domain(tag.find("em").contents[0])
+            logger.debug("domain='%s' - AFTER!", domain)
+
+        if domain == "":
+            logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
+            continue
+
+        logger.debug("domain='%s' - BEFORE!", domain)
+        domain = domain.encode("idna").decode("utf-8")
+        logger.debug("domain='%s' - AFTER!", domain)
 
         if not is_domain_wanted(domain):
             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
@@ -165,6 +174,8 @@ def is_domain_wanted(domain: str) -> bool:
     elif domain.endswith(".tld"):
         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
         wanted = False
+    elif "xn--" in domain:
+        raise ValueError(f"domain='{domain}' is a punycode domain, please don't crawl them!")
     elif blacklist.is_blacklisted(domain):
         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
         wanted = False