From f0a25c59d9e3bf21957793d1f2dc3f83ddea596a Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Wed, 10 Jul 2024 18:34:05 +0200 Subject: [PATCH] Continued: - introduced function domain_helper.encode_idna() which has centralized IDNA encoding --- fba/commands.py | 24 ++++++++++++------------ fba/helpers/domain.py | 24 +++++++++++++++++++++++- fba/http/federation.py | 3 +-- fba/models/blocks.py | 10 +++++----- fba/models/instances.py | 16 ++++++++-------- fba/utils.py | 2 +- 6 files changed, 50 insertions(+), 29 deletions(-) diff --git a/fba/commands.py b/fba/commands.py index 36c1b2e..c64c713 100644 --- a/fba/commands.py +++ b/fba/commands.py @@ -89,7 +89,7 @@ def check_nodeinfo(args: argparse.Namespace) -> int: cnt = 0 for row in database.cursor.fetchall(): logger.debug("Checking row[domain]='%s',row[software]='%s',row[nodeinfo_url]='%s' ...", row["domain"], row["software"], row["nodeinfo_url"]) - punycode = row["domain"].encode("idna").decode("utf-8") + punycode = domain_helper.encode_idna(row["domain"]) if row["nodeinfo_url"].startswith("/"): logger.debug("row[nodeinfo_url]='%s' is a relative URL and always matches", row["nodeinfo_url"]) @@ -153,7 +153,7 @@ def fetch_pixelfed_api(args: argparse.Namespace) -> int: continue logger.debug("row[domain]='%s' - BEFORE!", row["domain"]) - domain = row["domain"].encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(row["domain"]) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): @@ -252,7 +252,7 @@ def fetch_bkali(args: argparse.Namespace) -> int: logger.info("Adding %d new instances ...", len(domains)) for domain in domains: logger.debug("domain='%s' - BEFORE!", domain) - domain = domain.encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(domain) logger.debug("domain='%s' - AFTER!", domain) try: @@ -426,7 +426,7 @@ def fetch_blocks(args: argparse.Namespace) -> int: continue logger.debug("block[blocked]='%s' - BEFORE!", block["blocked"]) - block["blocked"] = block["blocked"].lstrip(".").encode("idna").decode("utf-8") + block["blocked"] = domain_helper.encode_idna(block["blocked"]) logger.debug("block[blocked]='%s' - AFTER!", block["blocked"]) if not domain_helper.is_wanted(block["blocked"]): @@ -576,7 +576,7 @@ def fetch_observer(args: argparse.Namespace) -> int: continue logger.debug("domain='%s' - BEFORE!", domain) - domain = domain.encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(domain) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): @@ -836,7 +836,7 @@ def fetch_fba_rss(args: argparse.Namespace) -> int: continue logger.debug("domain='%s' - BEFORE!", domain) - domain = domain.encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(domain) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): @@ -923,7 +923,7 @@ def fetch_fbabot_atom(args: argparse.Namespace) -> int: continue logger.debug("domain='%s' - BEFORE!", domain) - domain = domain.encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(domain) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): @@ -1039,7 +1039,7 @@ ORDER BY total_peers DESC, last_response_time ASC, last_updated ASC" logger.info("Checking %d entries ...", len(rows)) for row in rows: logger.debug("row[domain]='%s' - BEFORE!", row["domain"]) - domain = row["domain"].encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(row["domain"]) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): @@ -1192,7 +1192,7 @@ def fetch_fedipact(args: argparse.Namespace) -> int: continue logger.debug("domain='%s' - BEFORE!", domain) - domain = domain.encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(domain) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): @@ -1514,7 +1514,7 @@ def fetch_fedilist(args: argparse.Namespace) -> int: continue logger.debug("domain='%s' - BEFORE!", domain) - domain = domain.encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(domain) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): @@ -1678,7 +1678,7 @@ def fetch_instances_social(args: argparse.Namespace) -> int: continue logger.debug("domain='%s' - BEFORE!", domain) - domain = domain.encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(domain) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): @@ -1747,7 +1747,7 @@ def fetch_relaylist(args: argparse.Namespace) -> int: continue logger.debug("domain='%s' - BEFORE!", domain) - domain = domain.encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(domain) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): diff --git a/fba/helpers/domain.py b/fba/helpers/domain.py index fcf672a..b3b6f51 100644 --- a/fba/helpers/domain.py +++ b/fba/helpers/domain.py @@ -18,6 +18,7 @@ import logging from functools import lru_cache from urllib.parse import urlparse +from urllib.parse import urlunparse import validators @@ -67,7 +68,8 @@ def is_in_url(domain: str, url: str) -> bool: elif not validators.url(url): raise ValueError(f"Parameter url='{url}' is not a valid URL") - punycode = domain.encode("idna").decode("utf-8") + punycode = encode_idna(domain) + logger.debug("punycode='%s'", punycode) components = urlparse(url) logger.debug("components[]='%s',punycode='%s'", type(components), punycode) @@ -117,3 +119,23 @@ def is_wanted(domain: str) -> bool: logger.debug("wanted='%s' - EXIT!", wanted) return wanted + +@lru_cache +def encode_idna(domain: str) -> str: + logger.debug("domain='%s' - CALLED!") + raise_on(domain) + + punycode = domain.lstrip(".").split("?")[0] + logger.debug("punycode='%s' - AFTER!", punycode) + + if "/" in punycode: + components = urlparse("https://" + punycode) + logger.debug("components[%s](%d)='%s'", type(components), len(components), components) + + punycode = components.netloc.encode("idna").decode("utf-8") + components.path + logger.debug("punycode='%s',domain='%s'", punycode, domain) + else: + punycode = domain.encode("idna").decode("utf-8") + + logger.debug("punycode='%s' - EXIT!", punycode) + return punycode diff --git a/fba/http/federation.py b/fba/http/federation.py index 8c1523c..8b72de2 100644 --- a/fba/http/federation.py +++ b/fba/http/federation.py @@ -172,8 +172,7 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path: instance = instance.replace("..", ".") logger.debug("instance='%s' - BEFORE!", instance) - instance = instance.encode("idna").decode("utf-8") - instance = instance.split("?")[0] + instance = domain_helper.encode_idna(instance) logger.debug("instance='%s' - AFTER!", instance) if not domain_helper.is_wanted(instance): diff --git a/fba/models/blocks.py b/fba/models/blocks.py index 66424c3..0c4f32c 100644 --- a/fba/models/blocks.py +++ b/fba/models/blocks.py @@ -238,12 +238,12 @@ def translate_idnas(rows: list, column: str): for row in rows: logger.debug("row[]='%s'", type(row)) - translated = row[column].encode("idna").decode("utf-8") - logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column]) + punycode = domain_helper.encode_idna(row[column]) + logger.debug("punycode='%s',row[%s]='%s'", punycode, column, row[column]) - if translated != row[column]: - logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated) - database.cursor.execute(f"UPDATE blocks SET {column} = ? WHERE {column} = ?", [translated, row[column]]) + if punycode != row[column]: + logger.info("punycode row[%s]='%s' to '%s'", column, row[column], punycode) + database.cursor.execute(f"UPDATE blocks SET {column} = ? WHERE {column} = ?", [punycode, row[column]]) logger.debug("Invoking commit() ...") database.connection.commit() diff --git a/fba/models/instances.py b/fba/models/instances.py index f75bd6b..9e5cc8d 100644 --- a/fba/models/instances.py +++ b/fba/models/instances.py @@ -601,17 +601,17 @@ def translate_idnas(rows: list, column: str): for row in rows: logger.debug("row[]='%s'", type(row)) - translated = row[column].encode("idna").decode("utf-8") - logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column]) + punycode = domain_helper.encode_idna(row[column]) + logger.debug("punycode='%s',row[%s]='%s'", punycode, column, row[column]) - if translated != row[column]: - logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated) - if is_registered(translated, True): - logger.warning("Deleting row[%s]='%s' as translated='%s' already exist", column, row[column], translated) + if punycode != row[column]: + logger.info("punycode row[%s]='%s' to '%s'", column, row[column], punycode) + if is_registered(punycode, True): + logger.warning("Deleting row[%s]='%s' as punycode='%s' already exist", column, row[column], punycode) database.cursor.execute(f"DELETE FROM instances WHERE {column} = ? LIMIT 1", [row[column]]) else: - logger.debug("Updating row[%s]='%s' to translated='%s' ...", column, row[column], translated) - database.cursor.execute(f"UPDATE instances SET {column} = ? WHERE {column} = ? LIMIT 1", [translated, row[column]]) + logger.debug("Updating row[%s]='%s' to punycode='%s' ...", column, row[column], punycode) + database.cursor.execute(f"UPDATE instances SET {column} = ? WHERE {column} = ? LIMIT 1", [punycode, row[column]]) logger.debug("Invoking commit() ...") database.connection.commit() diff --git a/fba/utils.py b/fba/utils.py index 7b87b13..747da2b 100644 --- a/fba/utils.py +++ b/fba/utils.py @@ -113,7 +113,7 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list: continue logger.debug("domain='%s' - BEFORE!", domain) - domain = domain.encode("idna").decode("utf-8") + domain = domain_helper.encode_idna(domain) logger.debug("domain='%s' - AFTER!", domain) if not domain_helper.is_wanted(domain): -- 2.39.5