From c91d2df5d06450801414c1a757f6723c6f4fe78c Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Wed, 21 Jun 2023 22:55:38 +0200 Subject: [PATCH] Continued: - disabled lint-check no-else-raise - rewrote more f-masked logger lines to lazy '%' --- fba/commands.py | 40 +++++++++--------- fba/http/federation.py | 94 +++++++++++++++++++++-------------------- fba/models/instances.py | 2 +- pylint.rc | 2 +- 4 files changed, 71 insertions(+), 67 deletions(-) diff --git a/fba/commands.py b/fba/commands.py index 0057fa4..4e74e3f 100644 --- a/fba/commands.py +++ b/fba/commands.py @@ -550,7 +550,7 @@ def fetch_fbabot_atom(args: argparse.Namespace): domains = list() - logger.info(f"Fetching ATOM feed='{feed}' from FBA bot account ...") + logger.info("Fetching ATOM feed='%s' from FBA bot account ...", feed) response = utils.fetch_url(feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))) logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text)) @@ -586,10 +586,10 @@ def fetch_fbabot_atom(args: argparse.Namespace): if len(domains) > 0: locking.acquire() - logger.info(f"Adding {len(domains)} new instances ...") + logger.info("Adding %d new instances ...", len(domains)) for domain in domains: try: - logger.info(f"Fetching instances from domain='{domain}' ...") + logger.info("Fetching instances from domain='%s' ...", domain) federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name) logger.debug("Invoking cookies.clear(%s) ...", domain) @@ -606,7 +606,7 @@ def fetch_instances(args: argparse.Namespace) -> int: # Initial fetch try: - logger.info(f"Fetching instances from args.domain='{args.domain}' ...") + logger.info("Fetching instances from args.domain='%s' ...", args.domain) federation.fetch_instances(args.domain, None, None, inspect.currentframe().f_code.co_name) logger.debug(f"Invoking cookies.clear({args.domain}) ...") @@ -614,7 +614,6 @@ def fetch_instances(args: argparse.Namespace) -> int: except network.exceptions as exception: logger.warning("Exception '%s' during fetching instances (fetch_instances) from args.domain='%s'", type(exception), args.domain) instances.set_last_error(args.domain, exception) - return 100 if args.single: @@ -629,13 +628,13 @@ def fetch_instances(args: argparse.Namespace) -> int: rows = database.cursor.fetchall() logger.info("Checking %d entries ...", len(rows)) for row in rows: - logger.debug(f"domain='{row[0]}'") + logger.debug("domain='%s'", row[0]) if blacklist.is_blacklisted(row[0]): logger.warning("domain is blacklisted: row[0]='%s'", row[0]) continue try: - logger.info(f"Fetching instances for instance '{row[0]}' ('{row[2]}') of origin='{row[1]}',nodeinfo_url='{row[3]}'") + logger.info("Fetching instances for instance domain='%s',software='%s',origin='%s',nodeinfo_url='%s'", row[0], row[2], row[1], row[3]) federation.fetch_instances(row[0], row[1], row[2], inspect.currentframe().f_code.co_name, row[3]) logger.debug(f"Invoking cookies.clear({row[0]}) ...") @@ -696,25 +695,27 @@ def fetch_oliphant(args: argparse.Namespace): ) domains = list() + + logger.debug("Downloading %d files ...", len(blocklists)) for block in blocklists: # Is domain given and not equal blocker? if isinstance(args.domain, str) and args.domain != block["blocker"]: - logger.debug(f"Skipping blocker='{block['blocker']}', not matching args.domain='{args.domain}'") + logger.debug("Skipping blocker='%s', not matching args.domain='%s'", block['blocker'], args.domain) continue elif args.domain in domains: - logger.debug(f"args.domain='{args.domain}' already handled - SKIPPED!") + logger.debug("args.domain='%s' already handled - SKIPPED!", args.domain) continue # Fetch this URL - logger.info(f"Fetching csv_url='{block['csv_url']}' for blocker='{block['blocker']}' ...") + logger.info("Fetching csv_url='%s' for blocker='%s' ...", block['csv_url'], block['blocker']) response = utils.fetch_url(f"{base_url}/{block['csv_url']}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))) logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text)) if response.ok and response.content != "": - logger.debug(f"Fetched {len(response.content)} Bytes, parsing CSV ...") + logger.debug("Fetched %d Bytes, parsing CSV ...", len(response.content)) reader = csv.DictReader(response.content.decode('utf-8').splitlines(), dialect="unix") - logger.debug(f"reader[]='{type(reader)}'") + logger.debug("reader[]='%s'", type(reader)) for row in reader: domain = None if "#domain" in row: @@ -725,17 +726,18 @@ def fetch_oliphant(args: argparse.Namespace): logger.debug(f"row='{row}' does not contain domain column") continue + logger.debug("domain='%s'", domain) if not utils.is_domain_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue - logger.debug(f"Marking domain='{domain}' as handled") + logger.debug("Marking domain='%s' as handled", domain) domains.append(domain) - logger.debug(f"Processing domain='{domain}' ...") + logger.debug("Processing domain='%s' ...", domain) processed = utils.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name) - logger.debug(f"processed='{processed}'") + logger.debug("processed='%s'", processed) logger.debug("EXIT!") @@ -748,17 +750,17 @@ def fetch_txt(args: argparse.Namespace): "https://seirdy.one/pb/bsl.txt", ) - logger.info(f"Checking {len(urls)} text file(s) ...") + logger.info("Checking %d text file(s) ...", len(urls)) for url in urls: logger.debug("Fetching url='%s' ...", url) response = utils.fetch_url(url, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))) logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text)) if response.ok and response.status_code < 300 and response.text != "": - logger.debug(f"Returned {len(response.text.strip())} Bytes for processing") + logger.debug("Returned %d Bytes for processing", len(response.text.strip())) domains = response.text.split("\n") - logger.info(f"Processing {len(domains)} domains ...") + logger.info("Processing %d domains ...", len(domains)) for domain in domains: logger.debug("domain='%s'", domain) if domain == "": @@ -771,7 +773,7 @@ def fetch_txt(args: argparse.Namespace): logger.debug("domain='%s'", domain) processed = utils.process_domain(domain, 'seirdy.one', inspect.currentframe().f_code.co_name) - logger.debug(f"processed='{processed}'") + logger.debug("processed='%s'", processed) if not processed: logger.debug(f"domain='{domain}' was not generically processed - SKIPPED!") continue diff --git a/fba/http/federation.py b/fba/http/federation.py index 8cfc02a..f9565da 100644 --- a/fba/http/federation.py +++ b/fba/http/federation.py @@ -211,14 +211,14 @@ def fetch_peers(domain: str, software: str) -> list: else: logger.warning("Cannot parse data[json][]='%s'", type(data['json'])) - logger.debug(f"Adding '{len(peers)}' for domain='{domain}'") + logger.debug("Adding %d for domain='%s'", len(peers), domain) instances.set_total_peers(domain, peers) - logger.debug("Returning peers[]:", type(peers)) + logger.debug("Returning peers[]='%s' - EXIT!", type(peers)) return peers def fetch_nodeinfo(domain: str, path: str = None) -> dict: - logger.debug(f"domain='{domain}',path='{path}' - CALLED!") + logger.debug("domain='%s',path='%s' - CALLED!", domain, path) if not isinstance(domain, str): raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": @@ -234,12 +234,12 @@ def fetch_nodeinfo(domain: str, path: str = None) -> dict: elif not isinstance(path, str) and path is not None: raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") - logger.debug(f"Fetching nodeinfo from domain='{domain}' ...") + logger.debug("Fetching nodeinfo from domain='%s' ...", domain) nodeinfo = fetch_wellknown_nodeinfo(domain) - logger.debug(f"nodeinfo[{type(nodeinfo)}]({len(nodeinfo)}='{nodeinfo}'") + logger.debug("nodeinfo[%s]({len(nodeinfo)}='%s'", type(nodeinfo), nodeinfo) if "error_message" not in nodeinfo and "json" in nodeinfo and len(nodeinfo["json"]) > 0: - logger.debug(f"Found nodeinfo[json]()={len(nodeinfo['json'])} - EXIT!") + logger.debug("Found nodeinfo[json]()=%d - EXIT!", len(nodeinfo['json'])) return nodeinfo["json"] # No CSRF by default, you don't have to add network.api_headers by yourself here @@ -250,7 +250,7 @@ def fetch_nodeinfo(domain: str, path: str = None) -> dict: logger.debug("Checking CSRF for domain='%s'", domain) headers = csrf.determine(domain, dict()) except network.exceptions as exception: - logger.warning(f"Exception '{type(exception)}' during checking CSRF (nodeinfo,{__name__}) - EXIT!") + logger.warning("Exception '%s' during checking CSRF (nodeinfo,%s) - EXIT!", type(exception), __name__) instances.set_last_error(domain, exception) return { "status_code" : 500, @@ -270,7 +270,7 @@ def fetch_nodeinfo(domain: str, path: str = None) -> dict: for request in request_paths: logger.debug(f"path[{type(path)}]='{path}',request='{request}'") if path is None or path == request or path == f"http://{domain}{path}" or path == f"https://{domain}{path}": - logger.debug(f"Fetching request='{request}' from domain='{domain}' ...") + logger.debug("Fetching request='%s' from domain='%s' ...", request, domain) if path == f"http://{domain}{path}" or path == f"https://{domain}{path}": logger.debug(f"domain='{domain}',path='{path}' has protocol in path, splitting ...") components = urlparse(path) @@ -339,17 +339,17 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict: if "links" in nodeinfo: logger.debug("Found links in nodeinfo():", len(nodeinfo["links"])) for link in nodeinfo["links"]: - logger.debug(f"link[{type(link)}]='{link}'") + logger.debug("link[%s]='%s'", type(link), link) if not isinstance(link, dict) or not "rel" in link: - logger.warning(f"link[]='{type(link)}' is not 'dict' or no element 'rel' found") + logger.warning("link[]='%s' is not 'dict' or no element 'rel' found", type(link)) elif link["rel"] in nodeinfo_identifier: # Default is that 'href' has a complete URL, but some hosts don't send that url = link["href"] components = urlparse(link["href"]) - logger.debug(f"components[{type(components)}]='{components}'") + logger.debug("components[%s]='%s'", type(components), components) if components.scheme == "" and components.netloc == "": - logger.debug(f"link[href]='{link['href']}' has no scheme and host name in it, prepending from domain='{domain}'") + logger.debug("link[href]='%s' has no scheme and host name in it, prepending from domain='%s'", link['href'], domain) url = f"https://{domain}{url}" components = urlparse(url) @@ -365,16 +365,16 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict: logger.debug("href,data[]:", link["href"], type(data)) if "error_message" not in data and "json" in data: - logger.debug("Found JSON nodeinfo():", len(data)) + logger.debug("Found JSON nodeinfo()=%d", len(data)) instances.set_detection_mode(domain, "AUTO_DISCOVERY") instances.set_nodeinfo_url(domain, link["href"]) break else: instances.set_last_error(domain, data) else: - logger.warning("Unknown 'rel' value:", domain, link["rel"]) + logger.warning("Unknown 'rel' value: domain='%s',link[rel]='%s'", domain, link["rel"]) else: - logger.warning("nodeinfo does not contain 'links':", domain) + logger.warning("nodeinfo does not contain 'links': domain='%s'", domain) logger.debug("Returning data[]:", type(data)) return data @@ -398,19 +398,18 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: elif path == "": raise ValueError("Parameter 'path' is empty") - logger.debug(f"domain='{domain}',path='{path}' - CALLED!") + logger.debug("domain='%s',path='%s' - CALLED!", domain, path) software = None - logger.debug(f"Fetching path='{path}' from '{domain}' ...") + logger.debug("Fetching path='%s' from domain='%s' ...", path, domain) response = network.fetch_response(domain, path, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))) logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text)) if response.ok and response.status_code < 300 and response.text.find(" 0: - logger.debug(f"Parsing response.text()={len(response.text)} Bytes ...") - + logger.debug("Parsing response.text()=%d Bytes ...", len(response.text)) doc = bs4.BeautifulSoup(response.text, "html.parser") - logger.debug("doc[]='%s'", type(doc)) + logger.debug("doc[]='%s'", type(doc)) generator = doc.find("meta", {"name" : "generator"}) site_name = doc.find("meta", {"property": "og:site_name"}) @@ -437,21 +436,21 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: logger.debug("Corrected empty string to None for software of domain='%s'", domain) software = None elif isinstance(software, str) and ("." in software or " " in software): - logger.debug(f"software='{software}' may contain a version number, domain='{domain}', removing it ...") + logger.debug("software='%s' may contain a version number, domain='{domain}', removing it ...", software) software = version.remove(software) logger.debug("software[]='%s'", type(software)) if isinstance(software, str) and "powered by " in software: - logger.debug(f"software='{software}' has 'powered by' in it") + logger.debug("software='%s' has 'powered by' in it", software) software = version.remove(version.strip_powered_by(software)) elif isinstance(software, str) and " hosted on " in software: - logger.debug(f"software='{software}' has 'hosted on' in it") + logger.debug("software='%s' has 'hosted on' in it", software) software = version.remove(version.strip_hosted_on(software)) elif isinstance(software, str) and " by " in software: - logger.debug(f"software='{software}' has ' by ' in it") + logger.debug("software='%s' has ' by ' in it", software) software = version.strip_until(software, " by ") elif isinstance(software, str) and " see " in software: - logger.debug(f"software='{software}' has ' see ' in it") + logger.debug("software='%s' has ' see ' in it", software) software = version.strip_until(software, " see ") logger.debug("software='%s' - EXIT!", software) @@ -508,7 +507,7 @@ def determine_software(domain: str, path: str = None) -> str: return None software = tidyup.domain(software) - logger.debug("sofware after tidyup.domain():", software) + logger.debug("sofware after tidyup.domain(): software='%s'", software) if software in ["akkoma", "rebased", "akkounfucked", "ched"]: logger.debug("Setting pleroma:", domain, software) @@ -526,10 +525,10 @@ def determine_software(domain: str, path: str = None) -> str: logger.debug("Setting nextcloud:", domain, software) software = "nextcloud" elif software.find("/") > 0: - logger.warning("Spliting of slash:", software) + logger.warning("Spliting of slash: software='%s'", software) software = tidyup.domain(software.split("/")[-1]) elif software.find("|") > 0: - logger.warning("Spliting of pipe:", software) + logger.warning("Spliting of pipe: software='%s'", software) software = tidyup.domain(software.split("|")[0]) elif "powered by" in software: logger.debug(f"software='{software}' has 'powered by' in it") @@ -548,22 +547,22 @@ def determine_software(domain: str, path: str = None) -> str: logger.debug("software[]='%s'", type(software)) if str(software) == "": - logger.debug(f"software for '{domain}' was not detected, trying generator ...") + logger.debug("software for domain='%s' was not detected, trying generator ...", domain) software = fetch_generator_from_path(domain) elif len(str(software)) > 0 and ("." in software or " " in software): - logger.debug(f"software='{software}' may contain a version number, domain='{domain}', removing it ...") + logger.debug("software='%s' may contain a version number, domain='%s', removing it ...", software, domain) software = version.remove(software) logger.debug("software[]='%s'", type(software)) if isinstance(software, str) and "powered by" in software: - logger.debug(f"software='{software}' has 'powered by' in it") + logger.debug("software='%s' has 'powered by' in it", software) software = version.remove(version.strip_powered_by(software)) logger.debug("Returning domain,software:", domain, software) return software def find_domains(tag: bs4.element.Tag) -> list: - logger.debug(f"tag[]='{type(tag)}' - CALLED!") + logger.debug("tag[]='%s' - CALLED!", type(tag)) if not isinstance(tag, bs4.element.Tag): raise ValueError(f"Parameter tag[]='{type(tag)}' is not type of bs4.element.Tag") elif len(tag.select("tr")) == 0: @@ -571,7 +570,7 @@ def find_domains(tag: bs4.element.Tag) -> list: domains = list() for element in tag.select("tr"): - logger.debug(f"element[]='{type(element)}'") + logger.debug("element[]='%s'", type(element)) if not element.find("td"): logger.debug("Skipping element, no found") continue @@ -603,46 +602,49 @@ def find_domains(tag: bs4.element.Tag) -> list: logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain) continue - logger.debug(f"Adding domain='{domain}',reason='{reason}' ...") + logger.debug("Adding domain='%s',reason='%s' ...", domain, reason) domains.append({ "domain": domain, "reason": reason, }) - logger.debug(f"domains()={len(domains)} - EXIT!") + logger.debug("domains()=%d - EXIT!", len(domains)) return domains def add_peers(rows: dict) -> list: - logger.debug(f"rows[]={type(rows)} - CALLED!") + logger.debug("rows[]='%s' - CALLED!", type(rows)) if not isinstance(rows, dict): raise ValueError(f"Parameter rows[]='{type(rows)}' is not 'dict'") peers = list() for key in ["linked", "allowed", "blocked"]: - logger.debug(f"Checking key='{key}'") + logger.debug("Checking key='%s'", key) if key not in rows or rows[key] is None: - logger.debug(f"Cannot find key='{key}' or it is NoneType - SKIPPED!") + logger.debug("Cannot find key='%s' or it is NoneType - SKIPPED!", key) continue - logger.debug(f"Adding {len(rows[key])} peer(s) to peers list ...") + logger.debug("Adding %d peer(s) to peers list ...", len(rows[key])) for peer in rows[key]: - logger.debug(f"peer='{peer}' - BEFORE!") - if isinstance(peer, dict) and "domain" in peer: - logger.debug(f"peer[domain]='{peer['domain']}'") + logger.debug("peer[%s]='%s' - BEFORE!", type(peer), peer) + if peer is None or peer == "": + logger.debug("peer is empty - SKIPPED") + continue + elif isinstance(peer, dict) and "domain" in peer: + logger.debug("peer[domain]='%s'", peer['domain']) peer = tidyup.domain(peer["domain"]) elif isinstance(peer, str): - logger.debug(f"peer='{peer}'") + logger.debug("peer='%s'", peer) peer = tidyup.domain(peer) else: raise ValueError(f"peer[]='{type(peer)}' is not supported,key='{key}'") - logger.debug(f"peer='{peer}' - AFTER!") + logger.debug("peer[%s]='%s' - AFTER!", type(peer), peer) if not utils.is_domain_wanted(peer): logger.debug("peer='%s' is not wanted - SKIPPED!", peer) continue - logger.debug(f"Adding peer='{peer}' ...") + logger.debug("Adding peer='%s' ...", peer) peers.append(peer) - logger.debug(f"peers()={len(peers)} - EXIT!") + logger.debug("peers()=%d - EXIT!", len(peers)) return peers diff --git a/fba/models/instances.py b/fba/models/instances.py index d3f7f43..09f1f82 100644 --- a/fba/models/instances.py +++ b/fba/models/instances.py @@ -226,7 +226,7 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str logger.warning("Exception '%s' during determining software type, domain='%s'", type(exception), domain) set_last_error(domain, exception) - logger.debug("Determined software:", software) + logger.debug("Determined software='%s'", software) if software == "lemmy" and domain.find("/c/") > 0: domain = domain.split("/c/")[0] if is_registered(domain): diff --git a/pylint.rc b/pylint.rc index 215f280..e57bed4 100644 --- a/pylint.rc +++ b/pylint.rc @@ -60,7 +60,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use "--disable=all --enable=classes # --disable=W". -disable=anomalous-backslash-in-string, duplicate-code +disable=anomalous-backslash-in-string,duplicate-code,no-else-raise # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option -- 2.39.5