]> git.mxchange.org Git - fba.git/blobdiff - fba/networks/pleroma.py
Continued:
[fba.git] / fba / networks / pleroma.py
index 5a6f7ec3709578b687ae36353b9bd708f168e029..ade05924bbebd80d9691bee356d929eaa2550d87 100644 (file)
 import inspect
 import validators
 
+import bs4
+
 from fba import blacklist
 from fba import blocks
+from fba import config
 from fba import fba
 from fba import federation
 from fba import instances
+from fba import network
+
 from fba.helpers import tidyup
 
 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
@@ -41,7 +46,11 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
 
     # Blocks
     blockdict = list()
-    rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
+    rows = None
+    try:
+        rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
+    except network.exceptions as exception:
+        print(f"WARNING: Exception '{type(exception)}' during fetching nodeinfo")
 
     if rows is None:
         print("WARNING: Could not fetch nodeinfo from domain:", domain)
@@ -58,14 +67,15 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
 
     data = rows["metadata"]["federation"]
 
-    if "enabled" in data:
-        # DEBUG: print("DEBUG: Instance has no block list to analyze:", domain)
-        return
-
     if "mrf_simple" in data:
+        # DEBUG: print("DEBUG: Found mrf_simple:", domain)
         for block_level, blocklist in (
-            {**data["mrf_simple"],
-            **{"quarantined_instances": data["quarantined_instances"]}}
+            {
+                **data["mrf_simple"],
+                **{
+                    "quarantined_instances": data["quarantined_instances"]
+                }
+            }
         ).items():
             # DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
             block_level = tidyup.domain(block_level)
@@ -95,26 +105,26 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
                         )
                         searchres = fba.cursor.fetchone()
 
-                        print(f"DEBUG: searchres[]='{type(searchres)}'")
+                        # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
                         if searchres is None:
                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
                             continue
 
-                        blocked = searchres[0]
+                        blocked      = searchres[0]
                         nodeinfo_url = searchres[1]
                         # DEBUG: print("DEBUG: Looked up domain:", blocked)
                     elif not validators.domain(blocked):
                         print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
                         continue
-
-                    # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
-                    if not validators.domain(blocked):
-                        print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
-                        continue
                     elif blocked.split(".")[-1] == "arpa":
                         print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
                         continue
-                    elif not instances.is_registered(blocked):
+
+                    # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+                    if not instances.is_registered(blocked):
+                        # Commit changes
+                        fba.connection.commit()
+
                         # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
                         instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
 
@@ -131,9 +141,6 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
                         else:
                             # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
                             blocks.update_last_seen(domain, blocked, block_level)
-            else:
-                # DEBUG: print(f"DEBUG: domain='{domain}' has returned zero rows, trying /about/more page ...")
-                rows = fetch_blocks_from_about(domain)
 
     # DEBUG: print("DEBUG: Committing changes ...")
     fba.connection.commit()
@@ -142,10 +149,10 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
     if "mrf_simple_info" in data:
         # DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
         for block_level, info in (
-            {**data["mrf_simple_info"],
-            **(data["quarantined_instances_info"]
-            if "quarantined_instances_info" in data
-            else {})}
+            {
+                **data["mrf_simple_info"],
+                **(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
+            }
         ).items():
             # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
             block_level = tidyup.domain(block_level)
@@ -157,9 +164,18 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
 
             # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
             for blocked, reason in info.items():
-                # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - BEFORE!")
+                # DEBUG: print(f"DEBUG: blocked='{blocked}',reason[{type(reason)}]='{reason}' - BEFORE!")
                 blocked = tidyup.domain(blocked)
-                reason  = tidyup.reason(reason) if reason is not None and reason != "" else None
+
+                if isinstance(reason, str):
+                    # DEBUG: print("DEBUG: reason[] is a string")
+                    reason = tidyup.reason(reason)
+                elif isinstance(reason, dict) and "reason" in reason:
+                    # DEBUG: print("DEBUG: reason[] is a dict")
+                    reason = tidyup.reason(reason["reason"])
+                elif reason is not None:
+                    raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
+
                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
 
                 if blocked == "":
@@ -192,91 +208,17 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
                     print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
                     continue
                 elif not instances.is_registered(blocked):
-                    # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+                    # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodein
                     instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
 
-                # DEBUG: print("DEBUG: Updating block reason:", domain, blocked, reason["reason"])
-                blocks.update_reason(reason["reason"], domain, blocked, block_level)
+                # DEBUG: print(f"DEBUG: Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
+                blocks.update_reason(reason, domain, blocked, block_level)
 
                 # DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
                 for entry in blockdict:
                     if entry["blocked"] == blocked:
-                        # DEBUG: print("DEBUG: Updating entry reason:", blocked)
-                        entry["reason"] = reason["reason"]
+                        # DEBUG: print(f"DEBUG: Updating entry reason: blocked='{blocked}',reason='{reason}'")
+                        entry["reason"] = reason
 
     fba.connection.commit()
-
     # DEBUG: print("DEBUG: EXIT!")
-
-def fetch_blocks_from_about(domain: str) -> dict:
-    print(f"DEBUG: domain='{domain}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-
-    print("DEBUG: Fetching mastodon blocks from domain:", domain)
-    blocklist = {
-        "Suspended servers": [],
-        "Filtered media"   : [],
-        "Limited servers"  : [],
-        "Silenced servers" : [],
-    }
-
-    doc = None
-    for path in ("/about/more", "/about"):
-        try:
-            print(f"DEBUG: Fetching path='{path}' from domain='{domain}' ...")
-            doc = bs4.BeautifulSoup(
-                network.fetch_response(
-                    domain,
-                    path,
-                    network.web_headers,
-                    (config.get("connection_timeout"), config.get("read_timeout"))
-                ).text,
-                "html.parser",
-            )
-
-            if len(doc.find_all("h3")) > 0:
-                print(f"DEBUG: path='{path}' had some headlines - BREAK!")
-                break
-
-        except BaseException as exception:
-            print("ERROR: Cannot fetch from domain:", domain, exception)
-            instances.update_last_error(domain, exception)
-            break
-
-    print(f"DEBUG: doc[]='{type(doc)}'")
-    if doc is None:
-        print(f"WARNING: Cannot find any 'h3' tags for domain='{domain}' - EXIT!")
-        return blocklist
-
-    for header in doc.find_all("h3"):
-        header_text = tidyup.reason(header.text)
-
-        print(f"DEBUG: header_text='{header_text}'")
-        if header_text in language_mapping:
-            print(f"DEBUG: header_text='{header_text}'")
-            header_text = language_mapping[header_text]
-        else:
-            print(f"WARNING: header_text='{header_text}' not found in language mapping table")
-
-        if header_text in blocklist or header_text.lower() in blocklist:
-            # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
-            for line in header.find_all_next("table")[0].find_all("tr")[1:]:
-                blocklist[header_text].append(
-                    {
-                        "domain": tidyup.domain(line.find("span").text),
-                        "hash"  : tidyup.domain(line.find("span")["title"][9:]),
-                        "reason": tidyup.reason(line.find_all("td")[1].text),
-                    }
-                )
-        else:
-            print(f"WARNING: header_text='{header_text}' not found in blocklist()={len(blocklist)}")
-
-    print("DEBUG: Returning blocklist for domain:", domain)
-    return {
-        "reject"        : blocklist["Suspended servers"],
-        "media_removal" : blocklist["Filtered media"],
-        "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
-    }