cnt = 0
for row in database.cursor.fetchall():
logger.debug("Checking row[domain]='%s',row[software]='%s',row[nodeinfo_url]='%s' ...", row["domain"], row["software"], row["nodeinfo_url"])
- punycode = row["domain"].encode("idna").decode("utf-8")
+ punycode = domain_helper.encode_idna(row["domain"])
if row["nodeinfo_url"].startswith("/"):
logger.debug("row[nodeinfo_url]='%s' is a relative URL and always matches", row["nodeinfo_url"])
continue
logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
- domain = row["domain"].encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(row["domain"])
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):
logger.info("Adding %d new instances ...", len(domains))
for domain in domains:
logger.debug("domain='%s' - BEFORE!", domain)
- domain = domain.encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(domain)
logger.debug("domain='%s' - AFTER!", domain)
try:
continue
logger.debug("block[blocked]='%s' - BEFORE!", block["blocked"])
- block["blocked"] = block["blocked"].lstrip(".").encode("idna").decode("utf-8")
+ block["blocked"] = domain_helper.encode_idna(block["blocked"])
logger.debug("block[blocked]='%s' - AFTER!", block["blocked"])
if not domain_helper.is_wanted(block["blocked"]):
continue
logger.debug("domain='%s' - BEFORE!", domain)
- domain = domain.encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(domain)
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):
continue
logger.debug("domain='%s' - BEFORE!", domain)
- domain = domain.encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(domain)
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):
continue
logger.debug("domain='%s' - BEFORE!", domain)
- domain = domain.encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(domain)
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):
logger.info("Checking %d entries ...", len(rows))
for row in rows:
logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
- domain = row["domain"].encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(row["domain"])
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):
continue
logger.debug("domain='%s' - BEFORE!", domain)
- domain = domain.encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(domain)
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):
continue
logger.debug("domain='%s' - BEFORE!", domain)
- domain = domain.encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(domain)
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):
continue
logger.debug("domain='%s' - BEFORE!", domain)
- domain = domain.encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(domain)
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):
continue
logger.debug("domain='%s' - BEFORE!", domain)
- domain = domain.encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(domain)
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):
from functools import lru_cache
from urllib.parse import urlparse
+from urllib.parse import urlunparse
import validators
elif not validators.url(url):
raise ValueError(f"Parameter url='{url}' is not a valid URL")
- punycode = domain.encode("idna").decode("utf-8")
+ punycode = encode_idna(domain)
+ logger.debug("punycode='%s'", punycode)
components = urlparse(url)
logger.debug("components[]='%s',punycode='%s'", type(components), punycode)
logger.debug("wanted='%s' - EXIT!", wanted)
return wanted
+
+@lru_cache
+def encode_idna(domain: str) -> str:
+ logger.debug("domain='%s' - CALLED!")
+ raise_on(domain)
+
+ punycode = domain.lstrip(".").split("?")[0]
+ logger.debug("punycode='%s' - AFTER!", punycode)
+
+ if "/" in punycode:
+ components = urlparse("https://" + punycode)
+ logger.debug("components[%s](%d)='%s'", type(components), len(components), components)
+
+ punycode = components.netloc.encode("idna").decode("utf-8") + components.path
+ logger.debug("punycode='%s',domain='%s'", punycode, domain)
+ else:
+ punycode = domain.encode("idna").decode("utf-8")
+
+ logger.debug("punycode='%s' - EXIT!", punycode)
+ return punycode
instance = instance.replace("..", ".")
logger.debug("instance='%s' - BEFORE!", instance)
- instance = instance.encode("idna").decode("utf-8")
- instance = instance.split("?")[0]
+ instance = domain_helper.encode_idna(instance)
logger.debug("instance='%s' - AFTER!", instance)
if not domain_helper.is_wanted(instance):
for row in rows:
logger.debug("row[]='%s'", type(row))
- translated = row[column].encode("idna").decode("utf-8")
- logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column])
+ punycode = domain_helper.encode_idna(row[column])
+ logger.debug("punycode='%s',row[%s]='%s'", punycode, column, row[column])
- if translated != row[column]:
- logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated)
- database.cursor.execute(f"UPDATE blocks SET {column} = ? WHERE {column} = ?", [translated, row[column]])
+ if punycode != row[column]:
+ logger.info("punycode row[%s]='%s' to '%s'", column, row[column], punycode)
+ database.cursor.execute(f"UPDATE blocks SET {column} = ? WHERE {column} = ?", [punycode, row[column]])
logger.debug("Invoking commit() ...")
database.connection.commit()
for row in rows:
logger.debug("row[]='%s'", type(row))
- translated = row[column].encode("idna").decode("utf-8")
- logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column])
+ punycode = domain_helper.encode_idna(row[column])
+ logger.debug("punycode='%s',row[%s]='%s'", punycode, column, row[column])
- if translated != row[column]:
- logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated)
- if is_registered(translated, True):
- logger.warning("Deleting row[%s]='%s' as translated='%s' already exist", column, row[column], translated)
+ if punycode != row[column]:
+ logger.info("punycode row[%s]='%s' to '%s'", column, row[column], punycode)
+ if is_registered(punycode, True):
+ logger.warning("Deleting row[%s]='%s' as punycode='%s' already exist", column, row[column], punycode)
database.cursor.execute(f"DELETE FROM instances WHERE {column} = ? LIMIT 1", [row[column]])
else:
- logger.debug("Updating row[%s]='%s' to translated='%s' ...", column, row[column], translated)
- database.cursor.execute(f"UPDATE instances SET {column} = ? WHERE {column} = ? LIMIT 1", [translated, row[column]])
+ logger.debug("Updating row[%s]='%s' to punycode='%s' ...", column, row[column], punycode)
+ database.cursor.execute(f"UPDATE instances SET {column} = ? WHERE {column} = ? LIMIT 1", [punycode, row[column]])
logger.debug("Invoking commit() ...")
database.connection.commit()
continue
logger.debug("domain='%s' - BEFORE!", domain)
- domain = domain.encode("idna").decode("utf-8")
+ domain = domain_helper.encode_idna(domain)
logger.debug("domain='%s' - AFTER!", domain)
if not domain_helper.is_wanted(domain):