domains = list()
- logger.info(f"Fetching ATOM feed='{feed}' from FBA bot account ...")
+ logger.info("Fetching ATOM feed='%s' from FBA bot account ...", feed)
response = utils.fetch_url(feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if len(domains) > 0:
locking.acquire()
- logger.info(f"Adding {len(domains)} new instances ...")
+ logger.info("Adding %d new instances ...", len(domains))
for domain in domains:
try:
- logger.info(f"Fetching instances from domain='{domain}' ...")
+ logger.info("Fetching instances from domain='%s' ...", domain)
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
logger.debug("Invoking cookies.clear(%s) ...", domain)
# Initial fetch
try:
- logger.info(f"Fetching instances from args.domain='{args.domain}' ...")
+ logger.info("Fetching instances from args.domain='%s' ...", args.domain)
federation.fetch_instances(args.domain, None, None, inspect.currentframe().f_code.co_name)
logger.debug(f"Invoking cookies.clear({args.domain}) ...")
except network.exceptions as exception:
logger.warning("Exception '%s' during fetching instances (fetch_instances) from args.domain='%s'", type(exception), args.domain)
instances.set_last_error(args.domain, exception)
-
return 100
if args.single:
rows = database.cursor.fetchall()
logger.info("Checking %d entries ...", len(rows))
for row in rows:
- logger.debug(f"domain='{row[0]}'")
+ logger.debug("domain='%s'", row[0])
if blacklist.is_blacklisted(row[0]):
logger.warning("domain is blacklisted: row[0]='%s'", row[0])
continue
try:
- logger.info(f"Fetching instances for instance '{row[0]}' ('{row[2]}') of origin='{row[1]}',nodeinfo_url='{row[3]}'")
+ logger.info("Fetching instances for instance domain='%s',software='%s',origin='%s',nodeinfo_url='%s'", row[0], row[2], row[1], row[3])
federation.fetch_instances(row[0], row[1], row[2], inspect.currentframe().f_code.co_name, row[3])
logger.debug(f"Invoking cookies.clear({row[0]}) ...")
)
domains = list()
+
+ logger.debug("Downloading %d files ...", len(blocklists))
for block in blocklists:
# Is domain given and not equal blocker?
if isinstance(args.domain, str) and args.domain != block["blocker"]:
- logger.debug(f"Skipping blocker='{block['blocker']}', not matching args.domain='{args.domain}'")
+ logger.debug("Skipping blocker='%s', not matching args.domain='%s'", block['blocker'], args.domain)
continue
elif args.domain in domains:
- logger.debug(f"args.domain='{args.domain}' already handled - SKIPPED!")
+ logger.debug("args.domain='%s' already handled - SKIPPED!", args.domain)
continue
# Fetch this URL
- logger.info(f"Fetching csv_url='{block['csv_url']}' for blocker='{block['blocker']}' ...")
+ logger.info("Fetching csv_url='%s' for blocker='%s' ...", block['csv_url'], block['blocker'])
response = utils.fetch_url(f"{base_url}/{block['csv_url']}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.content != "":
- logger.debug(f"Fetched {len(response.content)} Bytes, parsing CSV ...")
+ logger.debug("Fetched %d Bytes, parsing CSV ...", len(response.content))
reader = csv.DictReader(response.content.decode('utf-8').splitlines(), dialect="unix")
- logger.debug(f"reader[]='{type(reader)}'")
+ logger.debug("reader[]='%s'", type(reader))
for row in reader:
domain = None
if "#domain" in row:
logger.debug(f"row='{row}' does not contain domain column")
continue
+ logger.debug("domain='%s'", domain)
if not utils.is_domain_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
- logger.debug(f"Marking domain='{domain}' as handled")
+ logger.debug("Marking domain='%s' as handled", domain)
domains.append(domain)
- logger.debug(f"Processing domain='{domain}' ...")
+ logger.debug("Processing domain='%s' ...", domain)
processed = utils.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
- logger.debug(f"processed='{processed}'")
+ logger.debug("processed='%s'", processed)
logger.debug("EXIT!")
"https://seirdy.one/pb/bsl.txt",
)
- logger.info(f"Checking {len(urls)} text file(s) ...")
+ logger.info("Checking %d text file(s) ...", len(urls))
for url in urls:
logger.debug("Fetching url='%s' ...", url)
response = utils.fetch_url(url, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and response.text != "":
- logger.debug(f"Returned {len(response.text.strip())} Bytes for processing")
+ logger.debug("Returned %d Bytes for processing", len(response.text.strip()))
domains = response.text.split("\n")
- logger.info(f"Processing {len(domains)} domains ...")
+ logger.info("Processing %d domains ...", len(domains))
for domain in domains:
logger.debug("domain='%s'", domain)
if domain == "":
logger.debug("domain='%s'", domain)
processed = utils.process_domain(domain, 'seirdy.one', inspect.currentframe().f_code.co_name)
- logger.debug(f"processed='{processed}'")
+ logger.debug("processed='%s'", processed)
if not processed:
logger.debug(f"domain='{domain}' was not generically processed - SKIPPED!")
continue
else:
logger.warning("Cannot parse data[json][]='%s'", type(data['json']))
- logger.debug(f"Adding '{len(peers)}' for domain='{domain}'")
+ logger.debug("Adding %d for domain='%s'", len(peers), domain)
instances.set_total_peers(domain, peers)
- logger.debug("Returning peers[]:", type(peers))
+ logger.debug("Returning peers[]='%s' - EXIT!", type(peers))
return peers
def fetch_nodeinfo(domain: str, path: str = None) -> dict:
- logger.debug(f"domain='{domain}',path='{path}' - CALLED!")
+ logger.debug("domain='%s',path='%s' - CALLED!", domain, path)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
elif not isinstance(path, str) and path is not None:
raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
- logger.debug(f"Fetching nodeinfo from domain='{domain}' ...")
+ logger.debug("Fetching nodeinfo from domain='%s' ...", domain)
nodeinfo = fetch_wellknown_nodeinfo(domain)
- logger.debug(f"nodeinfo[{type(nodeinfo)}]({len(nodeinfo)}='{nodeinfo}'")
+ logger.debug("nodeinfo[%s]({len(nodeinfo)}='%s'", type(nodeinfo), nodeinfo)
if "error_message" not in nodeinfo and "json" in nodeinfo and len(nodeinfo["json"]) > 0:
- logger.debug(f"Found nodeinfo[json]()={len(nodeinfo['json'])} - EXIT!")
+ logger.debug("Found nodeinfo[json]()=%d - EXIT!", len(nodeinfo['json']))
return nodeinfo["json"]
# No CSRF by default, you don't have to add network.api_headers by yourself here
logger.debug("Checking CSRF for domain='%s'", domain)
headers = csrf.determine(domain, dict())
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during checking CSRF (nodeinfo,{__name__}) - EXIT!")
+ logger.warning("Exception '%s' during checking CSRF (nodeinfo,%s) - EXIT!", type(exception), __name__)
instances.set_last_error(domain, exception)
return {
"status_code" : 500,
for request in request_paths:
logger.debug(f"path[{type(path)}]='{path}',request='{request}'")
if path is None or path == request or path == f"http://{domain}{path}" or path == f"https://{domain}{path}":
- logger.debug(f"Fetching request='{request}' from domain='{domain}' ...")
+ logger.debug("Fetching request='%s' from domain='%s' ...", request, domain)
if path == f"http://{domain}{path}" or path == f"https://{domain}{path}":
logger.debug(f"domain='{domain}',path='{path}' has protocol in path, splitting ...")
components = urlparse(path)
if "links" in nodeinfo:
logger.debug("Found links in nodeinfo():", len(nodeinfo["links"]))
for link in nodeinfo["links"]:
- logger.debug(f"link[{type(link)}]='{link}'")
+ logger.debug("link[%s]='%s'", type(link), link)
if not isinstance(link, dict) or not "rel" in link:
- logger.warning(f"link[]='{type(link)}' is not 'dict' or no element 'rel' found")
+ logger.warning("link[]='%s' is not 'dict' or no element 'rel' found", type(link))
elif link["rel"] in nodeinfo_identifier:
# Default is that 'href' has a complete URL, but some hosts don't send that
url = link["href"]
components = urlparse(link["href"])
- logger.debug(f"components[{type(components)}]='{components}'")
+ logger.debug("components[%s]='%s'", type(components), components)
if components.scheme == "" and components.netloc == "":
- logger.debug(f"link[href]='{link['href']}' has no scheme and host name in it, prepending from domain='{domain}'")
+ logger.debug("link[href]='%s' has no scheme and host name in it, prepending from domain='%s'", link['href'], domain)
url = f"https://{domain}{url}"
components = urlparse(url)
logger.debug("href,data[]:", link["href"], type(data))
if "error_message" not in data and "json" in data:
- logger.debug("Found JSON nodeinfo():", len(data))
+ logger.debug("Found JSON nodeinfo()=%d", len(data))
instances.set_detection_mode(domain, "AUTO_DISCOVERY")
instances.set_nodeinfo_url(domain, link["href"])
break
else:
instances.set_last_error(domain, data)
else:
- logger.warning("Unknown 'rel' value:", domain, link["rel"])
+ logger.warning("Unknown 'rel' value: domain='%s',link[rel]='%s'", domain, link["rel"])
else:
- logger.warning("nodeinfo does not contain 'links':", domain)
+ logger.warning("nodeinfo does not contain 'links': domain='%s'", domain)
logger.debug("Returning data[]:", type(data))
return data
elif path == "":
raise ValueError("Parameter 'path' is empty")
- logger.debug(f"domain='{domain}',path='{path}' - CALLED!")
+ logger.debug("domain='%s',path='%s' - CALLED!", domain, path)
software = None
- logger.debug(f"Fetching path='{path}' from '{domain}' ...")
+ logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
response = network.fetch_response(domain, path, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and response.text.find("<html") > 0:
- logger.debug(f"Parsing response.text()={len(response.text)} Bytes ...")
-
+ logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
doc = bs4.BeautifulSoup(response.text, "html.parser")
- logger.debug("doc[]='%s'", type(doc))
+ logger.debug("doc[]='%s'", type(doc))
generator = doc.find("meta", {"name" : "generator"})
site_name = doc.find("meta", {"property": "og:site_name"})
logger.debug("Corrected empty string to None for software of domain='%s'", domain)
software = None
elif isinstance(software, str) and ("." in software or " " in software):
- logger.debug(f"software='{software}' may contain a version number, domain='{domain}', removing it ...")
+ logger.debug("software='%s' may contain a version number, domain='{domain}', removing it ...", software)
software = version.remove(software)
logger.debug("software[]='%s'", type(software))
if isinstance(software, str) and "powered by " in software:
- logger.debug(f"software='{software}' has 'powered by' in it")
+ logger.debug("software='%s' has 'powered by' in it", software)
software = version.remove(version.strip_powered_by(software))
elif isinstance(software, str) and " hosted on " in software:
- logger.debug(f"software='{software}' has 'hosted on' in it")
+ logger.debug("software='%s' has 'hosted on' in it", software)
software = version.remove(version.strip_hosted_on(software))
elif isinstance(software, str) and " by " in software:
- logger.debug(f"software='{software}' has ' by ' in it")
+ logger.debug("software='%s' has ' by ' in it", software)
software = version.strip_until(software, " by ")
elif isinstance(software, str) and " see " in software:
- logger.debug(f"software='{software}' has ' see ' in it")
+ logger.debug("software='%s' has ' see ' in it", software)
software = version.strip_until(software, " see ")
logger.debug("software='%s' - EXIT!", software)
return None
software = tidyup.domain(software)
- logger.debug("sofware after tidyup.domain():", software)
+ logger.debug("sofware after tidyup.domain(): software='%s'", software)
if software in ["akkoma", "rebased", "akkounfucked", "ched"]:
logger.debug("Setting pleroma:", domain, software)
logger.debug("Setting nextcloud:", domain, software)
software = "nextcloud"
elif software.find("/") > 0:
- logger.warning("Spliting of slash:", software)
+ logger.warning("Spliting of slash: software='%s'", software)
software = tidyup.domain(software.split("/")[-1])
elif software.find("|") > 0:
- logger.warning("Spliting of pipe:", software)
+ logger.warning("Spliting of pipe: software='%s'", software)
software = tidyup.domain(software.split("|")[0])
elif "powered by" in software:
logger.debug(f"software='{software}' has 'powered by' in it")
logger.debug("software[]='%s'", type(software))
if str(software) == "":
- logger.debug(f"software for '{domain}' was not detected, trying generator ...")
+ logger.debug("software for domain='%s' was not detected, trying generator ...", domain)
software = fetch_generator_from_path(domain)
elif len(str(software)) > 0 and ("." in software or " " in software):
- logger.debug(f"software='{software}' may contain a version number, domain='{domain}', removing it ...")
+ logger.debug("software='%s' may contain a version number, domain='%s', removing it ...", software, domain)
software = version.remove(software)
logger.debug("software[]='%s'", type(software))
if isinstance(software, str) and "powered by" in software:
- logger.debug(f"software='{software}' has 'powered by' in it")
+ logger.debug("software='%s' has 'powered by' in it", software)
software = version.remove(version.strip_powered_by(software))
logger.debug("Returning domain,software:", domain, software)
return software
def find_domains(tag: bs4.element.Tag) -> list:
- logger.debug(f"tag[]='{type(tag)}' - CALLED!")
+ logger.debug("tag[]='%s' - CALLED!", type(tag))
if not isinstance(tag, bs4.element.Tag):
raise ValueError(f"Parameter tag[]='{type(tag)}' is not type of bs4.element.Tag")
elif len(tag.select("tr")) == 0:
domains = list()
for element in tag.select("tr"):
- logger.debug(f"element[]='{type(element)}'")
+ logger.debug("element[]='%s'", type(element))
if not element.find("td"):
logger.debug("Skipping element, no <td> found")
continue
logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
continue
- logger.debug(f"Adding domain='{domain}',reason='{reason}' ...")
+ logger.debug("Adding domain='%s',reason='%s' ...", domain, reason)
domains.append({
"domain": domain,
"reason": reason,
})
- logger.debug(f"domains()={len(domains)} - EXIT!")
+ logger.debug("domains()=%d - EXIT!", len(domains))
return domains
def add_peers(rows: dict) -> list:
- logger.debug(f"rows[]={type(rows)} - CALLED!")
+ logger.debug("rows[]='%s' - CALLED!", type(rows))
if not isinstance(rows, dict):
raise ValueError(f"Parameter rows[]='{type(rows)}' is not 'dict'")
peers = list()
for key in ["linked", "allowed", "blocked"]:
- logger.debug(f"Checking key='{key}'")
+ logger.debug("Checking key='%s'", key)
if key not in rows or rows[key] is None:
- logger.debug(f"Cannot find key='{key}' or it is NoneType - SKIPPED!")
+ logger.debug("Cannot find key='%s' or it is NoneType - SKIPPED!", key)
continue
- logger.debug(f"Adding {len(rows[key])} peer(s) to peers list ...")
+ logger.debug("Adding %d peer(s) to peers list ...", len(rows[key]))
for peer in rows[key]:
- logger.debug(f"peer='{peer}' - BEFORE!")
- if isinstance(peer, dict) and "domain" in peer:
- logger.debug(f"peer[domain]='{peer['domain']}'")
+ logger.debug("peer[%s]='%s' - BEFORE!", type(peer), peer)
+ if peer is None or peer == "":
+ logger.debug("peer is empty - SKIPPED")
+ continue
+ elif isinstance(peer, dict) and "domain" in peer:
+ logger.debug("peer[domain]='%s'", peer['domain'])
peer = tidyup.domain(peer["domain"])
elif isinstance(peer, str):
- logger.debug(f"peer='{peer}'")
+ logger.debug("peer='%s'", peer)
peer = tidyup.domain(peer)
else:
raise ValueError(f"peer[]='{type(peer)}' is not supported,key='{key}'")
- logger.debug(f"peer='{peer}' - AFTER!")
+ logger.debug("peer[%s]='%s' - AFTER!", type(peer), peer)
if not utils.is_domain_wanted(peer):
logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
continue
- logger.debug(f"Adding peer='{peer}' ...")
+ logger.debug("Adding peer='%s' ...", peer)
peers.append(peer)
- logger.debug(f"peers()={len(peers)} - EXIT!")
+ logger.debug("peers()=%d - EXIT!", len(peers))
return peers