]
for request in request_paths:
- logger.debug(f"path[{type(path)}]='{path}',request='{request}'")
+ logger.debug("path[%s]='%s',request='%s'", type(path), path, request)
if path is None or path == request or path == f"http://{domain}{path}" or path == f"https://{domain}{path}":
logger.debug("Fetching request='%s' from domain='%s' ...", request, domain)
- if path == f"http://{domain}{path}" or path == f"https://{domain}{path}":
- logger.debug(f"domain='{domain}',path='{path}' has protocol in path, splitting ...")
+ if path in [f"http://{domain}{path}", f"https://{domain}{path}"]:
+ logger.debug("domain='%s',path='%s' has protocol in path, splitting ...", domain, path)
components = urlparse(path)
path = components.path
instances.set_nodeinfo_url(domain, request)
break
- logger.warning(f"Failed fetching nodeinfo from domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'")
+ logger.warning("Failed fetching nodeinfo from domain='%s',status_code='%s',error_message='%s'", domain, data['status_code'], data['error_message'])
logger.debug("data()=%d - EXIT!", len(data))
return data
logger.debug("Checking CSRF for domain='%s'", domain)
headers = csrf.determine(domain, dict())
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during checking CSRF (fetch_wellknown_nodeinfo,{__name__}) - EXIT!")
+ logger.warning("Exception '%s' during checking CSRF (fetch_wellknown_nodeinfo,%s) - EXIT!", type(exception), __name__)
instances.set_last_error(domain, exception)
return {
"status_code" : 500,
if "error_message" not in data:
nodeinfo = data["json"]
- logger.debug("Found entries:", len(nodeinfo), domain)
+ logger.debug("Found entries: nodeinfo()=%d,domain='%s'", len(nodeinfo), domain)
if "links" in nodeinfo:
- logger.debug("Found links in nodeinfo():", len(nodeinfo["links"]))
+ logger.debug("Found nodeinfo[links]()=%d record(s)", len(nodeinfo["links"]))
for link in nodeinfo["links"]:
logger.debug("link[%s]='%s'", type(link), link)
if not isinstance(link, dict) or not "rel" in link:
(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug("href,data[]:", link["href"], type(data))
+ logger.debug("link[href]='%s',data[]='%s'", link["href"], type(data))
if "error_message" not in data and "json" in data:
logger.debug("Found JSON nodeinfo()=%d", len(data))
instances.set_detection_mode(domain, "AUTO_DISCOVERY")
else:
logger.warning("nodeinfo does not contain 'links': domain='%s'", domain)
- logger.debug("Returning data[]:", type(data))
+ logger.debug("Returning data[]='%s' - EXIT!", type(data))
return data
def fetch_generator_from_path(domain: str, path: str = "/") -> str:
domain_helper.raise_on(domain)
if not isinstance(path, str):
- raise ValueError(f"path[]='{type(path)}' is not 'str'")
+ raise ValueError("path[]='%s' is not 'str'", type(path))
elif path == "":
raise ValueError("Parameter 'path' is empty")
logger.debug("generator[]='%s',site_name[]='%s'", type(generator), type(site_name))
if isinstance(generator, bs4.element.Tag) and isinstance(generator.get("content"), str):
- logger.debug("Found generator meta tag:", domain)
+ logger.debug("Found generator meta tag: domain='%s'", domain)
software = tidyup.domain(generator.get("content"))
logger.debug("software[%s]='%s'", type(software), software)
logger.debug("Determining software for domain,path:", domain, path)
software = None
- logger.debug(f"Fetching nodeinfo from '{domain}' ...")
+ logger.debug("Fetching nodeinfo from domain='%s' ...", domain)
data = fetch_nodeinfo(domain, path)
- logger.debug(f"data[{type(data)}]='{data}'")
+ logger.debug("data[]='%s'", type(data))
if "exception" in data:
# Continue raising it
raise data["exception"]
elif "error_message" in data:
- logger.debug(f"Returned error_message during fetching nodeinfo: '{data['error_message']}',status_code='{data['status_code']}'")
+ logger.debug("Returned error_message during fetching nodeinfo: '%s',status_code='%d'", data['error_message'], data['status_code'])
return fetch_generator_from_path(domain)
elif "status" in data and data["status"] == "error" and "message" in data:
- logger.warning("JSON response is an error:", data["message"])
+ logger.warning("JSON response is an error: '%s'", data["message"])
instances.set_last_error(domain, data["message"])
return fetch_generator_from_path(domain)
elif "message" in data:
- logger.warning("JSON response contains only a message:", data["message"])
+ logger.warning("JSON response contains only a message: '%s'", data["message"])
instances.set_last_error(domain, data["message"])
return fetch_generator_from_path(domain)
elif "software" not in data or "name" not in data["software"]:
- logger.debug(f"JSON response from domain='{domain}' does not include [software][name], fetching / ...")
+ logger.debug("JSON response from domain='%s' does not include [software][name], fetching / ...", domain)
software = fetch_generator_from_path(domain)
- logger.debug(f"Generator for domain='{domain}' is: '{software}'")
+ logger.debug("Generator for domain='%s' is: '%s'", domain, software)
elif "software" in data and "name" in data["software"]:
logger.debug("Found data[software][name] in JSON response")
software = data["software"]["name"]
logger.debug("Returning None - EXIT!")
return None
+ logger.debug("software='%s'- BEFORE!", software)
software = tidyup.domain(software)
- logger.debug("sofware after tidyup.domain(): software='%s'", software)
+ logger.debug("software='%s'- AFTER!", software)
if software in ["akkoma", "rebased", "akkounfucked", "ched"]:
- logger.debug("Setting pleroma:", domain, software)
+ logger.debug("Setting pleroma: domain='%s',software='%s'", domain, software)
software = "pleroma"
elif software in ["hometown", "ecko"]:
- logger.debug("Setting mastodon:", domain, software)
+ logger.debug("Setting mastodon: domain='%s',software='%s'", domain, software)
software = "mastodon"
elif software in ["slipfox calckey", "calckey", "groundpolis", "foundkey", "cherrypick", "meisskey", "magnetar", "keybump"]:
- logger.debug("Setting misskey:", domain, software)
+ logger.debug("Setting misskey: domain='%s',software='%s'", domain, software)
software = "misskey"
elif software == "runtube.re":
- logger.debug("Setting peertube:", domain, software)
+ logger.debug("Setting peertube: domain='%s',software='%s'", domain, software)
software = "peertube"
elif software == "nextcloud social":
- logger.debug("Setting nextcloud:", domain, software)
+ logger.debug("Setting nextcloud: domain='%s',software='%s'", domain, software)
software = "nextcloud"
elif software.find("/") > 0:
- logger.warning("Spliting of slash: software='%s'", software)
+ logger.warning("Spliting of slash: domain='%s',software='%s'", domain, software)
software = tidyup.domain(software.split("/")[-1])
elif software.find("|") > 0:
- logger.warning("Spliting of pipe: software='%s'", software)
+ logger.warning("Spliting of pipe: domain='%s',software='%s'", domain, software)
software = tidyup.domain(software.split("|")[0])
elif "powered by" in software:
- logger.debug(f"software='{software}' has 'powered by' in it")
+ logger.debug("software='%s' has 'powered by' in it", software)
software = version.strip_powered_by(software)
elif isinstance(software, str) and " by " in software:
- logger.debug(f"software='{software}' has ' by ' in it")
+ logger.debug("software='%s' has ' by ' in it", software)
software = version.strip_until(software, " by ")
elif isinstance(software, str) and " see " in software:
- logger.debug(f"software='{software}' has ' see ' in it")
+ logger.debug("software='%s' has ' see ' in it", software)
software = version.strip_until(software, " see ")
- logger.debug("software[]='%s'", type(software))
+ logger.debug("software['%s']='%s'", type(software), software)
if software == "":
- logger.warning("tidyup.domain() left no software name behind:", domain)
+ logger.warning("tidyup.domain() left no software name behind: domain='%s'", domain)
software = None
logger.debug("software[]='%s'", type(software))
logger.debug("software='%s' has 'powered by' in it", software)
software = version.remove(version.strip_powered_by(software))
- logger.debug("Returning domain,software:", domain, software)
+ logger.debug("Returning domain='%s',software='%s'", domain, software)
return software
def find_domains(tag: bs4.element.Tag) -> list:
logger.debug(f"Fetching nodeinfo: domain='{domain}',nodeinfo_url='{nodeinfo_url}'")
rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching nodeinfo")
+ logger.warning("Exception '%s' during fetching nodeinfo from domain='%s'", type(exception), domain)
instances.set_last_error(domain, exception)
if rows is None:
logger.warning("Could not fetch nodeinfo from domain:", domain)
return
elif "metadata" not in rows:
- logger.warning(f"rows()={len(rows)} does not have key 'metadata', domain='{domain}'")
+ logger.warning("rows()=%d does not have key 'metadata', domain='%s'", len(rows), domain)
return
elif "federation" not in rows["metadata"]:
- logger.warning(f"rows()={len(rows['metadata'])} does not have key 'federation', domain='{domain}'")
+ logger.warning("rows()=%d does not have key 'federation', domain='%s'", len(rows['metadata']), domain)
return
data = rows["metadata"]["federation"]
}
}
).items():
- logger.debug("block_level, blocklist():", block_level, len(blocklist))
+ logger.debug("block_level='%s', blocklist()=%d", block_level, len(blocklist))
block_level = tidyup.domain(block_level)
- logger.debug("BEFORE block_level:", block_level)
+ logger.debug("block_level='%s' - AFTER!", block_level)
if block_level == "":
logger.warning("block_level is now empty!")
logger.debug("domain='%s' skipping block_level='accept'", domain)
continue
- logger.debug(f"Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
+ logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(blocklist), domain, block_level)
if len(blocklist) > 0:
for blocked in blocklist:
- logger.debug("BEFORE blocked:", blocked)
+ logger.debug("blocked='%s' - BEFORE!", blocked)
blocked = tidyup.domain(blocked)
- logger.debug("AFTER blocked:", blocked)
+ logger.debug("blocked='%s' - AFTER!", blocked)
if blocked == "":
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
continue
+ elif blocked.endswith(".arpa"):
+ logger.debug("blocked='%s' is a reverse IP address - SKIPPED!", blocked)
+ continue
+ elif blocked.endswith(".tld"):
+ logger.debug("blocked='%s' is a fake domain - SKIPPED!", blocked)
+ continue
elif blacklist.is_blacklisted(blocked):
logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
- logger.debug(f"blocked='{blocked}'")
+ logger.debug("blocked='%s'", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
logger.debug("Invoking commit() ...")
database.connection.commit()
- logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
if not blocks.is_instance_blocked(domain, blocked, block_level):
- logger.debug("Blocking:", domain, blocked, block_level)
+ logger.debug("Blocking domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
blocks.add_instance(domain, blocked, None, block_level)
if block_level == "reject":
- logger.debug("Adding to blockdict:", blocked)
+ logger.debug("Appending blocked='%s' ...", blocked)
blockdict.append({
"blocked": blocked,
"reason" : None
})
else:
- logger.debug(f"Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
+ logger.debug("Updating block last seen for domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
blocks.update_last_seen(domain, blocked, block_level)
elif "quarantined_instances" in data:
- logger.debug(f"Found 'quarantined_instances' in JSON response: domain='{domain}'")
+ logger.debug("Found 'quarantined_instances' in JSON response: domain='%s'", domain)
found = True
block_level = "quarantined"
for blocked in data["quarantined_instances"]:
- logger.debug("BEFORE blocked:", blocked)
+ logger.debug("blocked='%s' - BEFORE!", blocked)
blocked = tidyup.domain(blocked)
- logger.debug("AFTER blocked:", blocked)
+ logger.debug("blocked='%s' - AFTER!", blocked)
if blocked == "":
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
continue
+ elif blocked.endswith(".arpa"):
+ logger.debug("blocked='%s' is a reverse IP address - SKIPPED!", blocked)
+ continue
+ elif blocked.endswith(".tld"):
+ logger.debug("blocked='%s' is a fake domain - SKIPPED!", blocked)
+ continue
elif blacklist.is_blacklisted(blocked):
logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
- logger.debug(f"blocked='{blocked}'")
+ logger.debug("blocked='%s' - DEOBSFUCATED!", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
logger.debug("Invoking commit() ...")
database.connection.commit()
- logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='{nodeinfo_url}'", blocked, domain, origin)
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
if not blocks.is_instance_blocked(domain, blocked, block_level):
- logger.debug("Blocking:", domain, blocked, block_level)
+ logger.debug("Blocking domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
blocks.add_instance(domain, blocked, None, block_level)
if block_level == "reject":
- logger.debug("Adding to blockdict:", blocked)
+ logger.debug("Appending blocked='%s' ...", blocked)
blockdict.append({
"blocked": blocked,
"reason" : None
})
else:
- logger.debug(f"Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
+ logger.debug("Updating block last seen for domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
blocks.update_last_seen(domain, blocked, block_level)
else:
- logger.warning(f"Cannot find 'mrf_simple' or 'quarantined_instances' in JSON reply: domain='{domain}'")
+ logger.warning("Cannot find 'mrf_simple' or 'quarantined_instances' in JSON reply: domain='%s'", domain)
logger.debug("Invoking commit() ...")
database.connection.commit()
# Reasons
if "mrf_simple_info" in data:
- logger.debug("Found mrf_simple_info:", domain)
+ logger.debug("Found mrf_simple_info in API response: domain='%s'", domain)
found = True
for block_level, info in (
{
**(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
}
).items():
- logger.debug("block_level, info.items():", block_level, len(info.items()))
+ logger.debug("block_level='%s', info.items()=%d", block_level, len(info.items()))
block_level = tidyup.domain(block_level)
- logger.debug("BEFORE block_level:", block_level)
+ logger.debug("block_level='%s' - AFTER!", block_level)
if block_level == "":
logger.warning("block_level is now empty!")
logger.debug("domain='%s' skipping block_level='accept'", domain)
continue
- logger.debug(f"Checking {len(info.items())} entries from domain='{domain}',block_level='{block_level}' ...")
+ logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(info.items()), domain, block_level)
for blocked, reason in info.items():
- logger.debug(f"blocked='{blocked}',reason[{type(reason)}]='{reason}' - BEFORE!")
+ logger.debug("blocked='%s',reason[%s]='%s' - BEFORE!", blocked, type(reason), reason)
blocked = tidyup.domain(blocked)
+ logger.debug("blocked='%s' - AFTER!", blocked)
if isinstance(reason, str):
logger.debug("reason[] is a string")
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
- logger.debug(f"blocked='{blocked}'")
+ logger.debug("blocked='%s' - DEOBSFUCATED!", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
- logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
- logger.debug(f"Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
+ logger.debug("Updating block reason: reason='%s',domain='%s',blocked='%s',block_level='%s'", reason, domain, blocked, block_level)
blocks.update_reason(reason, domain, blocked, block_level)
- logger.debug(f"blockdict()={len(blockdict)}")
+ logger.debug("Checking %d blockdict records ...", len(blockdict))
for entry in blockdict:
if entry["blocked"] == blocked:
- logger.debug(f"Updating entry reason: blocked='{blocked}',reason='{reason}'")
+ logger.debug("Updating entry reason: blocked='%s',reason='%s'", blocked, reason)
entry["reason"] = reason
elif "quarantined_instances_info" in data and "quarantined_instances" in data["quarantined_instances_info"]:
- logger.debug(f"Found 'quarantined_instances_info' in JSON response: domain='{domain}'")
+ logger.debug("Found 'quarantined_instances_info' in JSON response: domain='%s'", domain)
found = True
block_level = "quarantined"
#print(data["quarantined_instances_info"])
rows = data["quarantined_instances_info"]["quarantined_instances"]
for blocked in rows:
- logger.debug("BEFORE blocked:", blocked)
+ logger.debug("blocked='%s' - BEFORE!", blocked)
blocked = tidyup.domain(blocked)
- logger.debug("AFTER blocked:", blocked)
+ logger.debug("blocked='%s' - AFTER!", blocked)
if blocked not in rows or "reason" not in rows[blocked]:
- logger.warning(f"Cannot find blocked='{blocked}' in rows()={len(rows)},domain='{domain}'")
+ logger.warning("Cannot find blocked='%s' in rows()=%d,domain='%s' - BREAK!", blocked, len(rows), domain)
break
reason = rows[blocked]["reason"]
- logger.debug(f"reason='{reason}'")
+ logger.debug("reason='%s'", reason)
if blocked == "":
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
- logger.debug(f"blocked='{blocked}'")
+ logger.debug("blocked='%s'", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
- logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
- logger.debug(f"Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
+ logger.debug("Updating block reason: reason='%s',domain='%s',blocked='%s',block_level='%s'", reason, domain, blocked, block_level)
blocks.update_reason(reason, domain, blocked, block_level)
- logger.debug(f"blockdict()={len(blockdict)}")
+ logger.debug("Checking %d blockdict records ...", len(blockdict))
for entry in blockdict:
if entry["blocked"] == blocked:
- logger.debug(f"Updating entry reason: blocked='{blocked}',reason='{reason}'")
+ logger.debug("Updating entry reason: blocked='%s',reason='%s'", blocked, reason)
entry["reason"] = reason
else:
- logger.warning(f"Cannot find 'mrf_simple_info' or 'quarantined_instances_info' in JSON reply: domain='{domain}'")
+ logger.warning("Cannot find 'mrf_simple_info' or 'quarantined_instances_info' in JSON reply: domain='%s'", domain)
if not found:
- logger.debug(f"Did not find any useable JSON elements, domain='{domain}', continuing with /about page ...")
+ logger.debug("Did not find any useable JSON elements, domain='%s', continuing with /about page ...", domain)
blocklist = fetch_blocks_from_about(domain)
- logger.debug(f"blocklist()={len(blocklist)}")
+ logger.debug("blocklist()=%d", len(blocklist))
if len(blocklist) > 0:
logger.info("Checking %d record(s) ...", len(blocklist))
for block_level in blocklist:
logger.debug("block_level='%s'", block_level)
rows = blocklist[block_level]
- logger.debug(f"rows['{type(rows)}]()={len(rows)}'")
+ logger.debug("rows[%s]()=%d'", type(rows), len(rows))
for record in rows:
- logger.debug(f"record[]='{type(record)}'")
+ logger.debug("record[]='%s'", type(record))
blocked = tidyup.domain(record["blocked"])
reason = tidyup.reason(record["reason"])
logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
- logger.debug(f"blocked='{blocked}' de-obscured to '{row[0]}'")
+ logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
- logger.debug(f"blocked='{blocked}'")
+ logger.debug("blocked='%s' - DEOBSFUCATED!", blocked)
if not utils.is_domain_wanted(blocked):
logger.warning("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
- logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ logger.debug("Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
if not blocks.is_instance_blocked(domain, blocked, block_level):
- logger.debug("Blocking:", domain, blocked, block_level)
+ logger.debug("Blocking domain='%s',blocked='%s', block_level='%s' ...", domain, blocked, block_level)
blocks.add_instance(domain, blocked, reason, block_level)
if block_level == "reject":
- logger.debug("Adding to blockdict:", blocked)
+ logger.debug("Appending blocked='%s' ...", blocked)
blockdict.append({
"blocked": blocked,
"reason" : reason
})
else:
- logger.debug(f"Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
+ logger.debug("Updating block last seen for domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
blocks.update_reason(reason, domain, blocked, block_level)
logger.debug("Invoking commit() ...")
logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
domain_helper.raise_on(domain)
- logger.debug(f"Fetching mastodon blocks from domain='{domain}'")
+ logger.debug("Fetching mastodon blocks from domain='%s'", domain)
doc = None
for path in ["/instance/about/index.html"]:
try:
# Resetting doc type
doc = None
- logger.debug(f"Fetching path='{path}' from domain='{domain}' ...")
+ logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
response = network.fetch_response(
domain,
path,
(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug(f"response.ok='{response.ok}',response.status_code='{response.status_code}',response.text()={len(response.text)}")
+ logger.debug("response.ok='%s',response.status_code='%d',response.text()=%d", response.ok, response.status_code, len(response.text))
if not response.ok or response.text.strip() == "":
- logger.warning(f"path='{path}' does not exist on domain='{domain}' - SKIPPED!")
+ logger.warning("path='%s' does not exist on domain='%s' - SKIPPED!", path, domain)
continue
- logger.debug(f"Parsing response.text()={len(response.text)} Bytes ...")
+ logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
doc = bs4.BeautifulSoup(
response.text,
"html.parser",
logger.debug("doc[]='%s'", type(doc))
if doc.find("h2") is not None:
- logger.debug(f"Found 'h2' header in path='{path}' - BREAK!")
+ logger.debug("Found 'h2' header in path='%s' - BREAK!", path)
break
except network.exceptions as exception:
logger.debug("doc[]='%s'", type(doc))
if doc is None:
- logger.warning(f"Cannot fetch any /about pages for domain='{domain}' - EXIT!")
+ logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
return blocklist
for header in doc.find_all("h2"):
header_text = tidyup.reason(header.text)
- logger.debug(f"header_text='{header_text}' - BEFORE!")
+ logger.debug("header_text='%s' - BEFORE!", header_text)
if header_text in language_mapping:
- logger.debug(f"header_text='{header_text}' - FOUND!")
+ logger.debug("header_text='%s' - FOUND!", header_text)
header_text = language_mapping[header_text]
else:
- logger.warning(f"header_text='{header_text}' not found in language mapping table")
+ logger.warning("header_text='%s' not found in language mapping table", header_text)
- logger.debug(f"header_text='{header_text} - AFTER!'")
+ logger.debug("header_text='%s - AFTER!'", header_text)
if header_text in blocklist or header_text.lower() in blocklist:
# replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
- logger.debug(f"Found header_text='{header_text}', importing domain blocks ...")
+ logger.debug("Found header_text='%s', importing domain blocks ...", header_text)
for line in header.find_next("table").find_all("tr")[1:]:
- logger.debug(f"line[]='{type(line)}'")
+ logger.debug("line[]='%s'", type(line))
blocklist[header_text].append({
"blocked": tidyup.domain(line.find_all("td")[0].text),
"reason" : tidyup.reason(line.find_all("td")[1].text),
})
else:
- logger.warning(f"header_text='{header_text}' not found in blocklist()={len(blocklist)}")
+ logger.warning("header_text='%s' not found in blocklist()=%d", header_text, len(blocklist))
- logger.debug(f"Returning blocklist for domain='{domain}'")
+ logger.debug("Returning blocklist for domain='%s' ...", domain)
return {
"reject" : blocklist["Suspended servers"],
"media_removal" : blocklist["Filtered media"],