]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Wed, 10 Jul 2024 16:34:05 +0000 (18:34 +0200)
committerRoland Häder <roland@mxchange.org>
Wed, 10 Jul 2024 16:34:05 +0000 (18:34 +0200)
- introduced function domain_helper.encode_idna() which has centralized IDNA
  encoding

fba/commands.py
fba/helpers/domain.py
fba/http/federation.py
fba/models/blocks.py
fba/models/instances.py
fba/utils.py

index 36c1b2effada1a0480b0cd1b5caa56e0a8db9017..c64c7136d7a79342c0775dba236cdf5a34443949 100644 (file)
@@ -89,7 +89,7 @@ def check_nodeinfo(args: argparse.Namespace) -> int:
     cnt = 0
     for row in database.cursor.fetchall():
         logger.debug("Checking row[domain]='%s',row[software]='%s',row[nodeinfo_url]='%s' ...", row["domain"], row["software"], row["nodeinfo_url"])
-        punycode = row["domain"].encode("idna").decode("utf-8")
+        punycode = domain_helper.encode_idna(row["domain"])
 
         if row["nodeinfo_url"].startswith("/"):
             logger.debug("row[nodeinfo_url]='%s' is a relative URL and always matches", row["nodeinfo_url"])
@@ -153,7 +153,7 @@ def fetch_pixelfed_api(args: argparse.Namespace) -> int:
                 continue
 
             logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
-            domain = row["domain"].encode("idna").decode("utf-8")
+            domain = domain_helper.encode_idna(row["domain"])
             logger.debug("domain='%s' - AFTER!", domain)
 
             if not domain_helper.is_wanted(domain):
@@ -252,7 +252,7 @@ def fetch_bkali(args: argparse.Namespace) -> int:
         logger.info("Adding %d new instances ...", len(domains))
         for domain in domains:
             logger.debug("domain='%s' - BEFORE!", domain)
-            domain = domain.encode("idna").decode("utf-8")
+            domain = domain_helper.encode_idna(domain)
             logger.debug("domain='%s' - AFTER!", domain)
 
             try:
@@ -426,7 +426,7 @@ def fetch_blocks(args: argparse.Namespace) -> int:
                 continue
 
             logger.debug("block[blocked]='%s' - BEFORE!", block["blocked"])
-            block["blocked"] = block["blocked"].lstrip(".").encode("idna").decode("utf-8")
+            block["blocked"] = domain_helper.encode_idna(block["blocked"])
             logger.debug("block[blocked]='%s' - AFTER!", block["blocked"])
 
             if not domain_helper.is_wanted(block["blocked"]):
@@ -576,7 +576,7 @@ def fetch_observer(args: argparse.Namespace) -> int:
                 continue
 
             logger.debug("domain='%s' - BEFORE!", domain)
-            domain = domain.encode("idna").decode("utf-8")
+            domain = domain_helper.encode_idna(domain)
             logger.debug("domain='%s' - AFTER!", domain)
 
             if not domain_helper.is_wanted(domain):
@@ -836,7 +836,7 @@ def fetch_fba_rss(args: argparse.Namespace) -> int:
                 continue
 
             logger.debug("domain='%s' - BEFORE!", domain)
-            domain = domain.encode("idna").decode("utf-8")
+            domain = domain_helper.encode_idna(domain)
             logger.debug("domain='%s' - AFTER!", domain)
 
             if not domain_helper.is_wanted(domain):
@@ -923,7 +923,7 @@ def fetch_fbabot_atom(args: argparse.Namespace) -> int:
                         continue
 
                     logger.debug("domain='%s' - BEFORE!", domain)
-                    domain = domain.encode("idna").decode("utf-8")
+                    domain = domain_helper.encode_idna(domain)
                     logger.debug("domain='%s' - AFTER!", domain)
 
                     if not domain_helper.is_wanted(domain):
@@ -1039,7 +1039,7 @@ ORDER BY total_peers DESC, last_response_time ASC, last_updated ASC"
     logger.info("Checking %d entries ...", len(rows))
     for row in rows:
         logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
-        domain = row["domain"].encode("idna").decode("utf-8")
+        domain = domain_helper.encode_idna(row["domain"])
         logger.debug("domain='%s' - AFTER!", domain)
 
         if not domain_helper.is_wanted(domain):
@@ -1192,7 +1192,7 @@ def fetch_fedipact(args: argparse.Namespace) -> int:
                 continue
 
             logger.debug("domain='%s' - BEFORE!", domain)
-            domain = domain.encode("idna").decode("utf-8")
+            domain = domain_helper.encode_idna(domain)
             logger.debug("domain='%s' - AFTER!", domain)
 
             if not domain_helper.is_wanted(domain):
@@ -1514,7 +1514,7 @@ def fetch_fedilist(args: argparse.Namespace) -> int:
             continue
 
         logger.debug("domain='%s' - BEFORE!", domain)
-        domain = domain.encode("idna").decode("utf-8")
+        domain = domain_helper.encode_idna(domain)
         logger.debug("domain='%s' - AFTER!", domain)
 
         if not domain_helper.is_wanted(domain):
@@ -1678,7 +1678,7 @@ def fetch_instances_social(args: argparse.Namespace) -> int:
             continue
 
         logger.debug("domain='%s' - BEFORE!", domain)
-        domain = domain.encode("idna").decode("utf-8")
+        domain = domain_helper.encode_idna(domain)
         logger.debug("domain='%s' - AFTER!", domain)
 
         if not domain_helper.is_wanted(domain):
@@ -1747,7 +1747,7 @@ def fetch_relaylist(args: argparse.Namespace) -> int:
             continue
 
         logger.debug("domain='%s' - BEFORE!", domain)
-        domain = domain.encode("idna").decode("utf-8")
+        domain = domain_helper.encode_idna(domain)
         logger.debug("domain='%s' - AFTER!", domain)
 
         if not domain_helper.is_wanted(domain):
index fcf672a68a090ae69b29640ee3f98e2bc30ffb4d..b3b6f511ead8496576bac230d435f6dd43988d13 100644 (file)
@@ -18,6 +18,7 @@ import logging
 
 from functools import lru_cache
 from urllib.parse import urlparse
+from urllib.parse import urlunparse
 
 import validators
 
@@ -67,7 +68,8 @@ def is_in_url(domain: str, url: str) -> bool:
     elif not validators.url(url):
         raise ValueError(f"Parameter url='{url}' is not a valid URL")
 
-    punycode = domain.encode("idna").decode("utf-8")
+    punycode = encode_idna(domain)
+    logger.debug("punycode='%s'", punycode)
 
     components = urlparse(url)
     logger.debug("components[]='%s',punycode='%s'", type(components), punycode)
@@ -117,3 +119,23 @@ def is_wanted(domain: str) -> bool:
 
     logger.debug("wanted='%s' - EXIT!", wanted)
     return wanted
+
+@lru_cache
+def encode_idna(domain: str) -> str:
+    logger.debug("domain='%s' - CALLED!")
+    raise_on(domain)
+
+    punycode = domain.lstrip(".").split("?")[0]
+    logger.debug("punycode='%s' - AFTER!", punycode)
+
+    if "/" in punycode:
+        components = urlparse("https://" + punycode)
+        logger.debug("components[%s](%d)='%s'", type(components), len(components), components)
+
+        punycode = components.netloc.encode("idna").decode("utf-8") + components.path
+        logger.debug("punycode='%s',domain='%s'", punycode, domain)
+    else:
+        punycode = domain.encode("idna").decode("utf-8")
+
+    logger.debug("punycode='%s' - EXIT!", punycode)
+    return punycode
index 8c1523c8aebd21c632058f4857c3eb5dd7b3238f..8b72de270eb9115dc0606a8ef6bacf23dfb986fc 100644 (file)
@@ -172,8 +172,7 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path:
             instance = instance.replace("..", ".")
 
         logger.debug("instance='%s' - BEFORE!", instance)
-        instance = instance.encode("idna").decode("utf-8")
-        instance = instance.split("?")[0]
+        instance = domain_helper.encode_idna(instance)
         logger.debug("instance='%s' - AFTER!", instance)
 
         if not domain_helper.is_wanted(instance):
index 66424c36ab0128e2cd025ddf177713ff55b550e5..0c4f32c0d9f402bfb724503b51085f065b31bfc4 100644 (file)
@@ -238,12 +238,12 @@ def translate_idnas(rows: list, column: str):
     for row in rows:
         logger.debug("row[]='%s'", type(row))
 
-        translated = row[column].encode("idna").decode("utf-8")
-        logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column])
+        punycode = domain_helper.encode_idna(row[column])
+        logger.debug("punycode='%s',row[%s]='%s'", punycode, column, row[column])
 
-        if translated != row[column]:
-            logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated)
-            database.cursor.execute(f"UPDATE blocks SET {column} = ? WHERE {column} = ?", [translated, row[column]])
+        if punycode != row[column]:
+            logger.info("punycode row[%s]='%s' to '%s'", column, row[column], punycode)
+            database.cursor.execute(f"UPDATE blocks SET {column} = ? WHERE {column} = ?", [punycode, row[column]])
 
             logger.debug("Invoking commit() ...")
             database.connection.commit()
index f75bd6b46c55d277ee51d17873652e30596cbfd5..9e5cc8df2a0e804e9ae5b691eb7fd57074478377 100644 (file)
@@ -601,17 +601,17 @@ def translate_idnas(rows: list, column: str):
     for row in rows:
         logger.debug("row[]='%s'", type(row))
 
-        translated = row[column].encode("idna").decode("utf-8")
-        logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column])
+        punycode = domain_helper.encode_idna(row[column])
+        logger.debug("punycode='%s',row[%s]='%s'", punycode, column, row[column])
 
-        if translated != row[column]:
-            logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated)
-            if is_registered(translated, True):
-                logger.warning("Deleting row[%s]='%s' as translated='%s' already exist", column, row[column], translated)
+        if punycode != row[column]:
+            logger.info("punycode row[%s]='%s' to '%s'", column, row[column], punycode)
+            if is_registered(punycode, True):
+                logger.warning("Deleting row[%s]='%s' as punycode='%s' already exist", column, row[column], punycode)
                 database.cursor.execute(f"DELETE FROM instances WHERE {column} = ? LIMIT 1", [row[column]])
             else:
-                logger.debug("Updating row[%s]='%s' to translated='%s' ...", column, row[column], translated)
-                database.cursor.execute(f"UPDATE instances SET {column} = ? WHERE {column} = ? LIMIT 1", [translated, row[column]])
+                logger.debug("Updating row[%s]='%s' to punycode='%s' ...", column, row[column], punycode)
+                database.cursor.execute(f"UPDATE instances SET {column} = ? WHERE {column} = ? LIMIT 1", [punycode, row[column]])
 
             logger.debug("Invoking commit() ...")
             database.connection.commit()
index 7b87b1316eefe63acdd8599fd345991e9494478e..747da2b6a201f83a49ac0c08a52114907b955763 100644 (file)
@@ -113,7 +113,7 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
             continue
 
         logger.debug("domain='%s' - BEFORE!", domain)
-        domain = domain.encode("idna").decode("utf-8")
+        domain = domain_helper.encode_idna(domain)
         logger.debug("domain='%s' - AFTER!", domain)
 
         if not domain_helper.is_wanted(domain):