elif row["domain"] == "":
logger.debug("row[domain] is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(row["domain"]):
+
+ logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
+ row["domain"] = row["domain"].encode("idna").decode("utf-8")
+ logger.debug("row[domain]='%s' - AFTER!", row["domain"])
+
+ if not utils.is_domain_wanted(row["domain"]):
logger.warning("row[domain]='%s' is not wanted - SKIPPED!", row["domain"])
continue
elif instances.is_registered(row["domain"]):
if len(domains) > 0:
logger.info("Adding %d new instances ...", len(domains))
for domain in domains:
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = domain.encode("idna").decode("utf-8")
+ logger.debug("domain='%s' - AFTER!", domain)
+
try:
logger.info("Fetching instances from domain='%s' ...", domain)
federation.fetch_instances(domain, 'tak.teleyal.blog', None, inspect.currentframe().f_code.co_name)
elif block["blocked"].endswith(".tld"):
logger.debug("blocked='%s' is a fake domain - SKIPPED", block["blocked"])
continue
+ elif "xn--" in block["blocked"]:
+ raise ValueError(f"blocked='{block['blocked']}' is a punycode domain, please don't crawl them!")
elif block["blocked"].find("*") >= 0:
logger.debug("blocker='%s' uses obfuscated domains", blocker)
if block["blocked"] == "":
logger.debug("block[blocked] is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(block["blocked"]):
+
+ logger.debug("block[blocked]='%s' - BEFORE!", block["blocked"])
+ block["blocked"] = block["blocked"].encode("idna").decode("utf-8")
+ logger.debug("block[blocked]='%s' - AFTER!", block["blocked"])
+
+ if not utils.is_domain_wanted(block["blocked"]):
logger.warning("blocked='%s' is not wanted - SKIPPED!", block["blocked"])
continue
elif block["block_level"] in ["accept", "accepted"]:
for item in items:
logger.debug("item[]='%s'", type(item))
domain = item.decode_contents()
-
logger.debug("domain='%s' - AFTER!", domain)
+
if domain == "":
logger.debug("domain is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(domain):
+
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = domain.encode("idna").decode("utf-8")
+ logger.debug("domain='%s' - AFTER!", domain)
+
+ if not utils.is_domain_wanted(domain):
logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif instances.is_registered(domain):
for row in blocklist[block_level]:
logger.debug("row[%s]='%s'", type(row), row)
- if instances.is_recent(row["domain"], "last_blocked"):
+ if not "domain" in row:
+ logger.warning("row[]='%s' has no element 'domain' - SKIPPED!", type(row))
+ continue
+ elif instances.is_recent(row["domain"], "last_blocked"):
logger.debug("row[domain]='%s' has been recently crawled - SKIPPED!", row["domain"])
continue
elif not instances.is_registered(row["domain"]):
if domain == "":
logger.debug("domain is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(domain):
+
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = domain.encode("idna").decode("utf-8")
+ logger.debug("domain='%s' - AFTER!", domain)
+
+ if not utils.is_domain_wanted(domain):
logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif domain in domains:
if len(domains) > 0:
logger.info("Adding %d new instances ...", len(domains))
for domain in domains:
+ logger.debug("domain='%s'", domain)
try:
logger.info("Fetching instances from domain='%s' ...", domain)
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
if domain == "":
logger.debug("domain is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(domain):
+
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = domain.encode("idna").decode("utf-8")
+ logger.debug("domain='%s' - AFTER!", domain)
+
+ if not utils.is_domain_wanted(domain):
logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif domain in domains:
if row["domain"] == "":
logger.debug("row[domain] is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(row["domain"]):
+
+ logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
+ row["domain"] = row["domain"].encode("idna").decode("utf-8")
+ logger.debug("row[domain]='%s' - AFTER!", row["domain"])
+
+ if not utils.is_domain_wanted(row["domain"]):
logger.warning("Domain row[domain]='%s' is not wanted - SKIPPED!", row["domain"])
continue
elif domain.endswith(".tld"):
logger.debug("domain='%s' is a fake domain - SKIPPED", domain)
continue
+ elif "xn--" in domain:
+ raise ValueError(f"domain='{domain}' is a punycode domain, please translate them back!")
elif domain.find("*") >= 0 or domain.find("?") >= 0:
logger.debug("domain='%s' is obfuscated - Invoking utils.deobfuscate(%s, %s) ...", domain, domain, block["blocker"])
domain = utils.deobfuscate(domain, block["blocker"])
if domain == "":
logger.debug("domain is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(domain):
+
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = domain.encode("idna").decode("utf-8")
+ logger.debug("domain='%s' - AFTER!", domain)
+
+ if not utils.is_domain_wanted(domain):
logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif instances.is_registered(domain):
elif block["blocked"].endswith(".tld"):
logger.debug("blocked='%s' is a fake domain name - SKIPPED!", block["blocked"])
continue
+ elif "xn--" in block["blocked"]:
+ raise ValueError(f"blocked='{block['blocked']}' is a punycode domain, please translate them back!")
elif block["blocked"].endswith(".onion"):
logger.debug("blocked='%s' is a TOR onion domain name - SKIPPED!", block["blocked"])
continue
if domain == "":
logger.debug("domain is empty after tidyup: row[hostname]='%s' - SKIPPED!", row["hostname"])
continue
- elif not utils.is_domain_wanted(domain):
+
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = domain.encode("idna").decode("utf-8")
+ logger.debug("domain='%s' - AFTER!", domain)
+
+ if not utils.is_domain_wanted(domain):
logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif (args.all is None or not args.all) and instances.is_registered(domain):
for row in rows:
logger.debug("row[]='%s'", type(row))
domain = tidyup.domain(row["name"])
-
logger.debug("domain='%s' - AFTER!", domain)
+
if domain == "":
logger.debug("domain is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(domain):
+
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = domain.encode("idna").decode("utf-8")
+ logger.debug("domain='%s' - AFTER!", domain)
+
+ if not utils.is_domain_wanted(domain):
logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif domain in domains:
logger.debug("Success! - EXIT!")
return 0
+
+def convert_idna(args: argparse.Namespace) -> int:
+ logger.debug("args[]='%s' - CALLED!", type(args))
+
+ database.cursor.execute("SELECT domain FROM instances WHERE domain NOT LIKE '%xn--%' ORDER BY domain ASC")
+ rows = database.cursor.fetchall()
+
+ logger.debug("rows[]='%s'", type(rows))
+ instances.translate_idnas(rows, "domain")
+
+ database.cursor.execute("SELECT origin FROM instances WHERE origin NOT LIKE '%xn--%' ORDER BY origin ASC")
+ rows = database.cursor.fetchall()
+
+ logger.debug("rows[]='%s'", type(rows))
+ instances.translate_idnas(rows, "origin")
+
+ database.cursor.execute("SELECT blocker FROM blocks WHERE blocker NOT LIKE '%xn--%' ORDER BY blocker ASC")
+ rows = database.cursor.fetchall()
+
+ logger.debug("rows[]='%s'", type(rows))
+ blocks.translate_idnas(rows, "blocker")
+
+ database.cursor.execute("SELECT blocked FROM blocks WHERE blocked NOT LIKE '%xn--%' ORDER BY blocked ASC")
+ rows = database.cursor.fetchall()
+
+ logger.debug("rows[]='%s'", type(rows))
+ blocks.translate_idnas(rows, "blocked")
+
+ logger.debug("Success! - EXIT!")
+ return 0
logger.debug("EXIT!")
-def is_registered(domain: str) -> bool:
- logger.debug("domain='%s' - CALLED!", domain)
- domain_helper.raise_on(domain)
+def is_registered(domain: str, skip_raise = False) -> bool:
+ logger.debug("domain='%s',skip_raise='%s' - CALLED!", domain, skip_raise)
+ if not isinstance(skip_raise, bool):
+ raise ValueError(f"skip_raise[]='%s' is not type of 'bool'", type(skip_raise))
+
+ if not skip_raise:
+ domain_helper.raise_on(domain)
logger.debug("domain='%s' - CALLED!", domain)
if not cache.key_exists("is_registered"):
logger.debug("valid='%s' - EXIT!", valid)
return valid
+
+def translate_idnas(rows: list, column: str):
+ logger.debug("rows[]='%s' - CALLED!", type(rows))
+ if not isinstance(rows, list):
+ raise ValueError(f"rows[]='%s' is not of type 'list'", type(rows))
+ elif len(rows) == 0:
+ raise ValueError("Parameter 'rows' is an empty list")
+ elif not isinstance(column, str):
+ raise ValueError(f"column='%s' is not of type 'str'", type(column))
+ elif column == "":
+ raise ValueError("Parameter 'column' is empty")
+ elif column not in ["domain", "origin"]:
+ raise ValueError(f"column='{column}' is not supported")
+
+ logger.info("Checking/converting %d domain names ...", len(rows))
+ for row in rows:
+ logger.debug("row[]='%s'", type(row))
+
+ translated = row[column].encode("idna").decode("utf-8")
+ logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column])
+
+ if translated != row[column]:
+ logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated)
+ if is_registered(translated, True):
+ logger.warning("Deleting row[%s]='%s' as translated='%s' already exist", column, row[column], translated)
+ database.cursor.execute(f"DELETE FROM instances WHERE {column} = ? LIMIT 1", [row[column]])
+ else:
+ database.cursor.execute(f"UPDATE instances SET {column} = ? WHERE {column} = ? LIMIT 1", [translated, row[column]])
+
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
+
+ logger.debug("EXIT!")