]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Thu, 25 May 2023 17:01:26 +0000 (19:01 +0200)
committerRoland Häder <roland@mxchange.org>
Thu, 25 May 2023 17:01:26 +0000 (19:01 +0200)
- encapsulated deleting of pending errors in remove_pending_error()
- introduced fetch_generator_from_path() and used it as last fall-back

fba.py

diff --git a/fba.py b/fba.py
index 27ee3bac6f52411477ad86f95ee8f7686c6e3a50..b4714dd7b61dbea1dd8e012e5b2d3f9d8f2a1c4b 100644 (file)
--- a/fba.py
+++ b/fba.py
@@ -96,6 +96,14 @@ def is_blacklisted(domain: str) -> bool:
 
     return blacklisted
 
+def remove_pending_error(domain: str):
+    try:
+        # Prevent updating any pending errors, nodeinfo was found
+        del pending_errors[domain]
+
+    except:
+        pass
+
 def get_hash(domain: str) -> str:
     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
 
@@ -278,6 +286,7 @@ def get_peers(domain: str, software: str) -> list:
     try:
         res = reqto.get(f"https://{domain}{get_peers_url}", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
 
+        # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
         if not res.ok or res.status_code >= 400:
             res = reqto.get(f"https://{domain}/api/v3/site", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
 
@@ -288,7 +297,7 @@ def get_peers(domain: str, software: str) -> list:
                 print("WARNING: Could not reach any JSON API:", domain)
                 update_last_error(domain, res)
         else:
-            # NOISY-DEBUG: print("DEBUG: Querying API was successful:", domain, len(res.json()))
+            # NOISY-DEBUG: print("DEBUG:Querying API was successful:", domain, len(res.json()))
             peers = res.json()
             nodeinfos["get_peers_url"][domain] = get_peers_url
 
@@ -307,6 +316,7 @@ def post_json_api(domain: str, path: str, data: str) -> list:
     try:
         res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
 
+        # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
         if not res.ok or res.status_code >= 400:
             print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
             update_last_error(domain, res)
@@ -324,7 +334,7 @@ def fetch_nodeinfo(domain: str) -> list:
     # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
 
     nodeinfo = fetch_wellknown_nodeinfo(domain)
-    # NOISY-DEBUG: print("DEBUG: nodeinfo:", len(nodeinfo))
+    # NOISY-DEBUG: print("DEBUG:nodeinfo:", len(nodeinfo))
 
     if len(nodeinfo) > 0:
         # NOISY-DEBUG: print("DEBUG: Returning auto-discovered nodeinfo:", len(nodeinfo))
@@ -362,10 +372,6 @@ def fetch_nodeinfo(domain: str) -> list:
             update_last_error(domain, e)
             pass
 
-    # NOISY-DEBUG: print("DEBUG: json[]:", type(json))
-    if not isinstance(json, dict) or len(json) == 0:
-        print("WARNING: Failed fetching nodeinfo from domain:", domain)
-
     # NOISY-DEBUG: print("DEBUG: Returning json[]:", type(json))
     return json
 
@@ -407,51 +413,64 @@ def fetch_wellknown_nodeinfo(domain: str) -> list:
     # NOISY-DEBUG: print("DEBUG: Returning json[]:", type(json))
     return json
 
+def fetch_generator_from_path(domain: str, path: str = "/") -> str:
+    # NOISY-DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!")
+    software = None
+
+    try:
+        # NOISY-DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' ...")
+        res = reqto.get(f"https://{domain}{path}", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+
+        # NOISY-DEBUG: print("DEBUG: domain,res.ok,res.status_code:", domain, res.ok, res.status_code)
+        if res.ok and res.status_code < 300 and len(res.text) > 0:
+            # NOISY-DEBUG: print("DEBUG: Search for <meta name='generator'>:", domain)
+            doc = bs4.BeautifulSoup(res.text, "html.parser")
+
+            # NOISY-DEBUG: print("DEBUG: doc[]:", type(doc))
+            tag = doc.find("meta", {"name": "generator"})
+
+            # NOISY-DEBUG: print(f"DEBUG: tag[{type(tag)}: {tag}")
+            if isinstance(tag, bs4.element.Tag):
+                # NOISY-DEBUG: print("DEBUG: Found generator meta tag: ", domain)
+                software = tidyup(tag.get("content"))
+                # NOISY-DEBUG: print(f"DEBUG: software='{software}'")
+                remove_pending_error(domain)
+
+    except BaseException as e:
+        print(f"WARNING: Cannot fetch / from '{domain}':", e)
+        update_last_error(domain, e)
+        pass
+
+    # NOISY-DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
+    return software
+
 def determine_software(domain: str) -> str:
     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
     software = None
 
+    # NOISY-DEBUG: print(f"DEBUG: Fetching nodeinfo from '{domain}' ...")
     json = fetch_nodeinfo(domain)
-    # NOISY-DEBUG: print("DEBUG: json[]:", type(json))
 
+    # NOISY-DEBUG: print("DEBUG: json[]:", type(json))
     if not isinstance(json, dict) or len(json) == 0:
         # NOISY-DEBUG: print("DEBUG: Could not determine software type:", domain)
-        return None
+        return fetch_generator_from_path(domain)
 
     # NOISY-DEBUG: print("DEBUG: json():", len(json), json)
     if "status" in json and json["status"] == "error" and "message" in json:
         print("WARNING: JSON response is an error:", json["message"])
         update_last_error(domain, json["message"])
-        return None
+        return fetch_generator_from_path(domain)
     elif "software" not in json or "name" not in json["software"]:
         # NOISY-DEBUG: print(f"DEBUG: JSON response from {domain} does not include [software][name], fetching / ...")
-        try:
-            res = reqto.get(f"https://{domain}/", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
-
-            # NOISY-DEBUG: print("DEBUG: domain,res.ok,res.status_code:", domain, res.ok, res.status_code)
-            if res.ok and res.status_code < 300 and len(res.text) > 0:
-                # NOISY-DEBUG: print("DEBUG: Search for <meta name='generator'>:", domain)
-
-                doc = bs4.BeautifulSoup(res.text, "html.parser")
-                # NOISY-DEBUG: print("DEBUG: doc[]:", type(doc))
-
-                tag = doc.find("meta", {"name": "generator"})
-                # NOISY-DEBUG: print(f"DEBUG: tag[{type(tag)}: {tag}")
-                if isinstance(tag, bs4.element.Tag):
-                    # NOISY-DEBUG: print("DEBUG: Found generator meta tag:", domain)
-                    software = tidyup(tag.get("content"))
-
-        except BaseException as e:
-            print(f"WARNING: Cannot fetch / from '{domain}':", e)
-            update_last_error(domain, e)
-            pass
+        software = fetch_generator_from_path(domain)
 
         # NOISY-DEBUG: print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!")
         return software
 
     software = tidyup(json["software"]["name"])
 
-    # NOISY-DEBUG: print("DEBUG: tidyup software:", software)
+    # NOISY-DEBUG: print("DEBUG: sofware after tidyup():", software)
     if software in ["akkoma", "rebased"]:
         # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, software)
         software = "pleroma"
@@ -462,16 +481,22 @@ def determine_software(domain: str) -> str:
         # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, software)
         software = "misskey"
     elif software.find("/") > 0:
-        print("WARNING: Spliting of path:", software)
+        print("WARNING: Spliting of slash:", software)
         software = software.split("/")[-1];
     elif software.find("|") > 0:
-        print("WARNING: Spliting of path:", software)
-        software = software.split("|")[0].strip();
+        print("WARNING: Spliting of pipe:", software)
+        software = tidyup(software.split("|")[0]);
 
+    # NOISY-DEBUG: print(f"DEBUG: software[]={type(software)}")
     if software == "":
         print("WARNING: tidyup() left no software name behind:", domain)
         software = None
 
+    # NOISY-DEBUG: print(f"DEBUG: software[]={type(software)}")
+    if str(software) == "":
+        # NOISY-DEBUG: print(f"DEBUG: software for '{domain}' was not detected, trying generator ...")
+        software = fetch_generator_from_path(domain)
+
     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
     return software
 
@@ -489,6 +514,7 @@ def update_block_reason(reason: str, blocker: str, blocked: str, block_level: st
             ),
         )
 
+        # NOISY-DEBUG: print(f"DEBUG: cursor.rowcount={cursor.rowcount}")
         if cursor.rowcount == 0:
             print("WARNING: Did not update any rows:", domain)
 
@@ -496,6 +522,8 @@ def update_block_reason(reason: str, blocker: str, blocked: str, block_level: st
         print("ERROR: failed SQL query:", reason, blocker, blocked, block_level, e)
         sys.exit(255)
 
+    # NOISY-DEBUG: print("DEBUG: EXIT!")
+
 def update_last_seen(blocker: str, blocked: str, block_level: str):
     # NOISY-DEBUG: print("DEBUG: Updating last_seen for:", blocker, blocked, block_level)
     try:
@@ -576,15 +604,11 @@ def add_instance(domain: str, origin: str, originator: str):
         if domain in nodeinfos["nodeinfo_url"]:
             # NOISY-DEBUG # NOISY-DEBUG: print("DEBUG: domain has pending nodeinfo being updated:", domain)
             update_nodeinfos(domain)
-            try:
-                # Prevent updating any pending errors, nodeinfo was found
-                del pending_errors[domain]
-            except:
-                pass
+            remove_pending_error(domain)
         elif domain in pending_errors:
             # NOISY-DEBUG: print("DEBUG: domain has pending error being updated:", domain)
             update_last_error(domain, pending_errors[domain])
-            del pending_errors[domain]
+            remove_pending_error(domain)
 
     except BaseException as e:
         print("ERROR: failed SQL query:", domain, e)
@@ -692,7 +716,7 @@ def get_friendica_blocks(domain: str) -> dict:
 
     # Prevents exceptions:
     if blocklist is None:
-        # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
+        # NOISY-DEBUG: print("DEBUG:Instance has no block list:", domain)
         return {}
 
     for line in blocklist.find("table").find_all("tr")[1:]: