]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Wed, 21 Jun 2023 20:55:38 +0000 (22:55 +0200)
committerRoland Häder <roland@mxchange.org>
Wed, 21 Jun 2023 20:55:38 +0000 (22:55 +0200)
- disabled lint-check no-else-raise
- rewrote more f-masked logger lines to lazy '%'

fba/commands.py
fba/http/federation.py
fba/models/instances.py
pylint.rc

index 0057fa4b079249881f4da2f3697332e34957ff8c..4e74e3f6c2f61350e2256ed1611d7889d7234f81 100644 (file)
@@ -550,7 +550,7 @@ def fetch_fbabot_atom(args: argparse.Namespace):
 
     domains = list()
 
-    logger.info(f"Fetching ATOM feed='{feed}' from FBA bot account ...")
+    logger.info("Fetching ATOM feed='%s' from FBA bot account ...", feed)
     response = utils.fetch_url(feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
 
     logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
@@ -586,10 +586,10 @@ def fetch_fbabot_atom(args: argparse.Namespace):
     if len(domains) > 0:
         locking.acquire()
 
-        logger.info(f"Adding {len(domains)} new instances ...")
+        logger.info("Adding %d new instances ...", len(domains))
         for domain in domains:
             try:
-                logger.info(f"Fetching instances from domain='{domain}' ...")
+                logger.info("Fetching instances from domain='%s' ...", domain)
                 federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
 
                 logger.debug("Invoking cookies.clear(%s) ...", domain)
@@ -606,7 +606,7 @@ def fetch_instances(args: argparse.Namespace) -> int:
 
     # Initial fetch
     try:
-        logger.info(f"Fetching instances from args.domain='{args.domain}' ...")
+        logger.info("Fetching instances from args.domain='%s' ...", args.domain)
         federation.fetch_instances(args.domain, None, None, inspect.currentframe().f_code.co_name)
 
         logger.debug(f"Invoking cookies.clear({args.domain}) ...")
@@ -614,7 +614,6 @@ def fetch_instances(args: argparse.Namespace) -> int:
     except network.exceptions as exception:
         logger.warning("Exception '%s' during fetching instances (fetch_instances) from args.domain='%s'", type(exception), args.domain)
         instances.set_last_error(args.domain, exception)
-
         return 100
 
     if args.single:
@@ -629,13 +628,13 @@ def fetch_instances(args: argparse.Namespace) -> int:
     rows = database.cursor.fetchall()
     logger.info("Checking %d entries ...", len(rows))
     for row in rows:
-        logger.debug(f"domain='{row[0]}'")
+        logger.debug("domain='%s'", row[0])
         if blacklist.is_blacklisted(row[0]):
             logger.warning("domain is blacklisted: row[0]='%s'", row[0])
             continue
 
         try:
-            logger.info(f"Fetching instances for instance '{row[0]}' ('{row[2]}') of origin='{row[1]}',nodeinfo_url='{row[3]}'")
+            logger.info("Fetching instances for instance domain='%s',software='%s',origin='%s',nodeinfo_url='%s'", row[0], row[2], row[1], row[3])
             federation.fetch_instances(row[0], row[1], row[2], inspect.currentframe().f_code.co_name, row[3])
 
             logger.debug(f"Invoking cookies.clear({row[0]}) ...")
@@ -696,25 +695,27 @@ def fetch_oliphant(args: argparse.Namespace):
     )
 
     domains = list()
+
+    logger.debug("Downloading %d files ...", len(blocklists))
     for block in blocklists:
         # Is domain given and not equal blocker?
         if isinstance(args.domain, str) and args.domain != block["blocker"]:
-            logger.debug(f"Skipping blocker='{block['blocker']}', not matching args.domain='{args.domain}'")
+            logger.debug("Skipping blocker='%s', not matching args.domain='%s'", block['blocker'], args.domain)
             continue
         elif args.domain in domains:
-            logger.debug(f"args.domain='{args.domain}' already handled - SKIPPED!")
+            logger.debug("args.domain='%s' already handled - SKIPPED!", args.domain)
             continue
 
         # Fetch this URL
-        logger.info(f"Fetching csv_url='{block['csv_url']}' for blocker='{block['blocker']}' ...")
+        logger.info("Fetching csv_url='%s' for blocker='%s' ...", block['csv_url'], block['blocker'])
         response = utils.fetch_url(f"{base_url}/{block['csv_url']}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
 
         logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
         if response.ok and response.content != "":
-            logger.debug(f"Fetched {len(response.content)} Bytes, parsing CSV ...")
+            logger.debug("Fetched %d Bytes, parsing CSV ...", len(response.content))
             reader = csv.DictReader(response.content.decode('utf-8').splitlines(), dialect="unix")
 
-            logger.debug(f"reader[]='{type(reader)}'")
+            logger.debug("reader[]='%s'", type(reader))
             for row in reader:
                 domain = None
                 if "#domain" in row:
@@ -725,17 +726,18 @@ def fetch_oliphant(args: argparse.Namespace):
                     logger.debug(f"row='{row}' does not contain domain column")
                     continue
 
+                logger.debug("domain='%s'", domain)
                 if not utils.is_domain_wanted(domain):
                     logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
                     continue
 
-                logger.debug(f"Marking domain='{domain}' as handled")
+                logger.debug("Marking domain='%s' as handled", domain)
                 domains.append(domain)
 
-                logger.debug(f"Processing domain='{domain}' ...")
+                logger.debug("Processing domain='%s' ...", domain)
                 processed = utils.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
 
-                logger.debug(f"processed='{processed}'")
+                logger.debug("processed='%s'", processed)
 
     logger.debug("EXIT!")
 
@@ -748,17 +750,17 @@ def fetch_txt(args: argparse.Namespace):
         "https://seirdy.one/pb/bsl.txt",
     )
 
-    logger.info(f"Checking {len(urls)} text file(s) ...")
+    logger.info("Checking %d text file(s) ...", len(urls))
     for url in urls:
         logger.debug("Fetching url='%s' ...", url)
         response = utils.fetch_url(url, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
 
         logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
         if response.ok and response.status_code < 300 and response.text != "":
-            logger.debug(f"Returned {len(response.text.strip())} Bytes for processing")
+            logger.debug("Returned %d Bytes for processing", len(response.text.strip()))
             domains = response.text.split("\n")
 
-            logger.info(f"Processing {len(domains)} domains ...")
+            logger.info("Processing %d domains ...", len(domains))
             for domain in domains:
                 logger.debug("domain='%s'", domain)
                 if domain == "":
@@ -771,7 +773,7 @@ def fetch_txt(args: argparse.Namespace):
                 logger.debug("domain='%s'", domain)
                 processed = utils.process_domain(domain, 'seirdy.one', inspect.currentframe().f_code.co_name)
 
-                logger.debug(f"processed='{processed}'")
+                logger.debug("processed='%s'", processed)
                 if not processed:
                     logger.debug(f"domain='{domain}' was not generically processed - SKIPPED!")
                     continue
index 8cfc02acbe4661f83e08927e61de2245024162b7..f9565da222e1cb54ee3f488dcbbb02821a633d14 100644 (file)
@@ -211,14 +211,14 @@ def fetch_peers(domain: str, software: str) -> list:
     else:
         logger.warning("Cannot parse data[json][]='%s'", type(data['json']))
 
-    logger.debug(f"Adding '{len(peers)}' for domain='{domain}'")
+    logger.debug("Adding %d for domain='%s'", len(peers), domain)
     instances.set_total_peers(domain, peers)
 
-    logger.debug("Returning peers[]:", type(peers))
+    logger.debug("Returning peers[]='%s' - EXIT!", type(peers))
     return peers
 
 def fetch_nodeinfo(domain: str, path: str = None) -> dict:
-    logger.debug(f"domain='{domain}',path='{path}' - CALLED!")
+    logger.debug("domain='%s',path='%s' - CALLED!", domain, path)
     if not isinstance(domain, str):
         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
     elif domain == "":
@@ -234,12 +234,12 @@ def fetch_nodeinfo(domain: str, path: str = None) -> dict:
     elif not isinstance(path, str) and path is not None:
         raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
 
-    logger.debug(f"Fetching nodeinfo from domain='{domain}' ...")
+    logger.debug("Fetching nodeinfo from domain='%s' ...", domain)
     nodeinfo = fetch_wellknown_nodeinfo(domain)
 
-    logger.debug(f"nodeinfo[{type(nodeinfo)}]({len(nodeinfo)}='{nodeinfo}'")
+    logger.debug("nodeinfo[%s]({len(nodeinfo)}='%s'", type(nodeinfo), nodeinfo)
     if "error_message" not in nodeinfo and "json" in nodeinfo and len(nodeinfo["json"]) > 0:
-        logger.debug(f"Found nodeinfo[json]()={len(nodeinfo['json'])} - EXIT!")
+        logger.debug("Found nodeinfo[json]()=%d - EXIT!", len(nodeinfo['json']))
         return nodeinfo["json"]
 
     # No CSRF by default, you don't have to add network.api_headers by yourself here
@@ -250,7 +250,7 @@ def fetch_nodeinfo(domain: str, path: str = None) -> dict:
         logger.debug("Checking CSRF for domain='%s'", domain)
         headers = csrf.determine(domain, dict())
     except network.exceptions as exception:
-        logger.warning(f"Exception '{type(exception)}' during checking CSRF (nodeinfo,{__name__}) - EXIT!")
+        logger.warning("Exception '%s' during checking CSRF (nodeinfo,%s) - EXIT!", type(exception), __name__)
         instances.set_last_error(domain, exception)
         return {
             "status_code"  : 500,
@@ -270,7 +270,7 @@ def fetch_nodeinfo(domain: str, path: str = None) -> dict:
     for request in request_paths:
         logger.debug(f"path[{type(path)}]='{path}',request='{request}'")
         if path is None or path == request or path == f"http://{domain}{path}" or path == f"https://{domain}{path}":
-            logger.debug(f"Fetching request='{request}' from domain='{domain}' ...")
+            logger.debug("Fetching request='%s' from domain='%s' ...", request, domain)
             if path == f"http://{domain}{path}" or path == f"https://{domain}{path}":
                 logger.debug(f"domain='{domain}',path='{path}' has protocol in path, splitting ...")
                 components = urlparse(path)
@@ -339,17 +339,17 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict:
         if "links" in nodeinfo:
             logger.debug("Found links in nodeinfo():", len(nodeinfo["links"]))
             for link in nodeinfo["links"]:
-                logger.debug(f"link[{type(link)}]='{link}'")
+                logger.debug("link[%s]='%s'", type(link), link)
                 if not isinstance(link, dict) or not "rel" in link:
-                    logger.warning(f"link[]='{type(link)}' is not 'dict' or no element 'rel' found")
+                    logger.warning("link[]='%s' is not 'dict' or no element 'rel' found", type(link))
                 elif link["rel"] in nodeinfo_identifier:
                     # Default is that 'href' has a complete URL, but some hosts don't send that
                     url = link["href"]
                     components = urlparse(link["href"])
 
-                    logger.debug(f"components[{type(components)}]='{components}'")
+                    logger.debug("components[%s]='%s'", type(components), components)
                     if components.scheme == "" and components.netloc == "":
-                        logger.debug(f"link[href]='{link['href']}' has no scheme and host name in it, prepending from domain='{domain}'")
+                        logger.debug("link[href]='%s' has no scheme and host name in it, prepending from domain='%s'", link['href'], domain)
                         url = f"https://{domain}{url}"
                         components = urlparse(url)
 
@@ -365,16 +365,16 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict:
 
                     logger.debug("href,data[]:", link["href"], type(data))
                     if "error_message" not in data and "json" in data:
-                        logger.debug("Found JSON nodeinfo():", len(data))
+                        logger.debug("Found JSON nodeinfo()=%d", len(data))
                         instances.set_detection_mode(domain, "AUTO_DISCOVERY")
                         instances.set_nodeinfo_url(domain, link["href"])
                         break
                     else:
                         instances.set_last_error(domain, data)
                 else:
-                    logger.warning("Unknown 'rel' value:", domain, link["rel"])
+                    logger.warning("Unknown 'rel' value: domain='%s',link[rel]='%s'", domain, link["rel"])
         else:
-            logger.warning("nodeinfo does not contain 'links':", domain)
+            logger.warning("nodeinfo does not contain 'links': domain='%s'", domain)
 
     logger.debug("Returning data[]:", type(data))
     return data
@@ -398,19 +398,18 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str:
     elif path == "":
         raise ValueError("Parameter 'path' is empty")
 
-    logger.debug(f"domain='{domain}',path='{path}' - CALLED!")
+    logger.debug("domain='%s',path='%s' - CALLED!", domain, path)
     software = None
 
-    logger.debug(f"Fetching path='{path}' from '{domain}' ...")
+    logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
     response = network.fetch_response(domain, path, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
 
     logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
     if response.ok and response.status_code < 300 and response.text.find("<html") > 0:
-        logger.debug(f"Parsing response.text()={len(response.text)} Bytes ...")
-
+        logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
         doc = bs4.BeautifulSoup(response.text, "html.parser")
-        logger.debug("doc[]='%s'", type(doc))
 
+        logger.debug("doc[]='%s'", type(doc))
         generator = doc.find("meta", {"name"    : "generator"})
         site_name = doc.find("meta", {"property": "og:site_name"})
 
@@ -437,21 +436,21 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str:
         logger.debug("Corrected empty string to None for software of domain='%s'", domain)
         software = None
     elif isinstance(software, str) and ("." in software or " " in software):
-        logger.debug(f"software='{software}' may contain a version number, domain='{domain}', removing it ...")
+        logger.debug("software='%s' may contain a version number, domain='{domain}', removing it ...", software)
         software = version.remove(software)
 
     logger.debug("software[]='%s'", type(software))
     if isinstance(software, str) and "powered by " in software:
-        logger.debug(f"software='{software}' has 'powered by' in it")
+        logger.debug("software='%s' has 'powered by' in it", software)
         software = version.remove(version.strip_powered_by(software))
     elif isinstance(software, str) and " hosted on " in software:
-        logger.debug(f"software='{software}' has 'hosted on' in it")
+        logger.debug("software='%s' has 'hosted on' in it", software)
         software = version.remove(version.strip_hosted_on(software))
     elif isinstance(software, str) and " by " in software:
-        logger.debug(f"software='{software}' has ' by ' in it")
+        logger.debug("software='%s' has ' by ' in it", software)
         software = version.strip_until(software, " by ")
     elif isinstance(software, str) and " see " in software:
-        logger.debug(f"software='{software}' has ' see ' in it")
+        logger.debug("software='%s' has ' see ' in it", software)
         software = version.strip_until(software, " see ")
 
     logger.debug("software='%s' - EXIT!", software)
@@ -508,7 +507,7 @@ def determine_software(domain: str, path: str = None) -> str:
         return None
 
     software = tidyup.domain(software)
-    logger.debug("sofware after tidyup.domain():", software)
+    logger.debug("sofware after tidyup.domain(): software='%s'", software)
 
     if software in ["akkoma", "rebased", "akkounfucked", "ched"]:
         logger.debug("Setting pleroma:", domain, software)
@@ -526,10 +525,10 @@ def determine_software(domain: str, path: str = None) -> str:
         logger.debug("Setting nextcloud:", domain, software)
         software = "nextcloud"
     elif software.find("/") > 0:
-        logger.warning("Spliting of slash:", software)
+        logger.warning("Spliting of slash: software='%s'", software)
         software = tidyup.domain(software.split("/")[-1])
     elif software.find("|") > 0:
-        logger.warning("Spliting of pipe:", software)
+        logger.warning("Spliting of pipe: software='%s'", software)
         software = tidyup.domain(software.split("|")[0])
     elif "powered by" in software:
         logger.debug(f"software='{software}' has 'powered by' in it")
@@ -548,22 +547,22 @@ def determine_software(domain: str, path: str = None) -> str:
 
     logger.debug("software[]='%s'", type(software))
     if str(software) == "":
-        logger.debug(f"software for '{domain}' was not detected, trying generator ...")
+        logger.debug("software for domain='%s' was not detected, trying generator ...", domain)
         software = fetch_generator_from_path(domain)
     elif len(str(software)) > 0 and ("." in software or " " in software):
-        logger.debug(f"software='{software}' may contain a version number, domain='{domain}', removing it ...")
+        logger.debug("software='%s' may contain a version number, domain='%s', removing it ...", software, domain)
         software = version.remove(software)
 
     logger.debug("software[]='%s'", type(software))
     if isinstance(software, str) and "powered by" in software:
-        logger.debug(f"software='{software}' has 'powered by' in it")
+        logger.debug("software='%s' has 'powered by' in it", software)
         software = version.remove(version.strip_powered_by(software))
 
     logger.debug("Returning domain,software:", domain, software)
     return software
 
 def find_domains(tag: bs4.element.Tag) -> list:
-    logger.debug(f"tag[]='{type(tag)}' - CALLED!")
+    logger.debug("tag[]='%s' - CALLED!", type(tag))
     if not isinstance(tag, bs4.element.Tag):
         raise ValueError(f"Parameter tag[]='{type(tag)}' is not type of bs4.element.Tag")
     elif len(tag.select("tr")) == 0:
@@ -571,7 +570,7 @@ def find_domains(tag: bs4.element.Tag) -> list:
 
     domains = list()
     for element in tag.select("tr"):
-        logger.debug(f"element[]='{type(element)}'")
+        logger.debug("element[]='%s'", type(element))
         if not element.find("td"):
             logger.debug("Skipping element, no <td> found")
             continue
@@ -603,46 +602,49 @@ def find_domains(tag: bs4.element.Tag) -> list:
             logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
             continue
 
-        logger.debug(f"Adding domain='{domain}',reason='{reason}' ...")
+        logger.debug("Adding domain='%s',reason='%s' ...", domain, reason)
         domains.append({
             "domain": domain,
             "reason": reason,
         })
 
-    logger.debug(f"domains()={len(domains)} - EXIT!")
+    logger.debug("domains()=%d - EXIT!", len(domains))
     return domains
 
 def add_peers(rows: dict) -> list:
-    logger.debug(f"rows[]={type(rows)} - CALLED!")
+    logger.debug("rows[]='%s' - CALLED!", type(rows))
     if not isinstance(rows, dict):
         raise ValueError(f"Parameter rows[]='{type(rows)}' is not 'dict'")
 
     peers = list()
     for key in ["linked", "allowed", "blocked"]:
-        logger.debug(f"Checking key='{key}'")
+        logger.debug("Checking key='%s'", key)
         if key not in rows or rows[key] is None:
-            logger.debug(f"Cannot find key='{key}' or it is NoneType - SKIPPED!")
+            logger.debug("Cannot find key='%s' or it is NoneType - SKIPPED!", key)
             continue
 
-        logger.debug(f"Adding {len(rows[key])} peer(s) to peers list ...")
+        logger.debug("Adding %d peer(s) to peers list ...", len(rows[key]))
         for peer in rows[key]:
-            logger.debug(f"peer='{peer}' - BEFORE!")
-            if isinstance(peer, dict) and "domain" in peer:
-                logger.debug(f"peer[domain]='{peer['domain']}'")
+            logger.debug("peer[%s]='%s' - BEFORE!", type(peer), peer)
+            if peer is None or peer == "":
+                logger.debug("peer is empty - SKIPPED")
+                continue
+            elif isinstance(peer, dict) and "domain" in peer:
+                logger.debug("peer[domain]='%s'", peer['domain'])
                 peer = tidyup.domain(peer["domain"])
             elif isinstance(peer, str):
-                logger.debug(f"peer='{peer}'")
+                logger.debug("peer='%s'", peer)
                 peer = tidyup.domain(peer)
             else:
                 raise ValueError(f"peer[]='{type(peer)}' is not supported,key='{key}'")
 
-            logger.debug(f"peer='{peer}' - AFTER!")
+            logger.debug("peer[%s]='%s' - AFTER!", type(peer), peer)
             if not utils.is_domain_wanted(peer):
                 logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
                 continue
 
-            logger.debug(f"Adding peer='{peer}' ...")
+            logger.debug("Adding peer='%s' ...", peer)
             peers.append(peer)
 
-    logger.debug(f"peers()={len(peers)} - EXIT!")
+    logger.debug("peers()=%d - EXIT!", len(peers))
     return peers
index d3f7f43bc6b1ebcef2e0d7cea26190b5e6df6a86..09f1f821a6ebc8560ca1bf2a77efc6a089785517 100644 (file)
@@ -226,7 +226,7 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str
             logger.warning("Exception '%s' during determining software type, domain='%s'", type(exception), domain)
             set_last_error(domain, exception)
 
-    logger.debug("Determined software:", software)
+    logger.debug("Determined software='%s'", software)
     if software == "lemmy" and domain.find("/c/") > 0:
         domain = domain.split("/c/")[0]
         if is_registered(domain):
index 215f2808368449c0f8956f008a3420036bf365d5..e57bed4e00060ddfb47c74032665857cb0088ad8 100644 (file)
--- a/pylint.rc
+++ b/pylint.rc
@@ -60,7 +60,7 @@ confidence=
 # --enable=similarities". If you want to run only the classes checker, but have
 # no Warning level messages displayed, use "--disable=all --enable=classes
 # --disable=W".
-disable=anomalous-backslash-in-string, duplicate-code
+disable=anomalous-backslash-in-string,duplicate-code,no-else-raise
 
 # Enable the message, report, category or checker with the given id(s). You can
 # either give multiple identifier separated by comma (,) or put this option