]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Wed, 7 Jun 2023 17:08:09 +0000 (19:08 +0200)
committerRoland Häder <roland@mxchange.org>
Wed, 7 Jun 2023 17:23:53 +0000 (19:23 +0200)
- moved gotosocial part to fba/federation/gotosocial.py
- fixed duplicate variable names

fba/commands.py
fba/fba.py
fba/federation/__init__.py
fba/federation/gotosocial.py [new file with mode: 0644]

index 2b7ca9baf0bc3951880e48f14ad2ef03eb3f47ae..4f6839b6a7f196a061fb37c67c3d8e971230af28 100644 (file)
@@ -241,79 +241,7 @@ def fetch_blocks(args: argparse.Namespace):
                 print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(e)}]:'{str(e)}'")
         elif software == "gotosocial":
             print(f"INFO: blocker='{blocker}',software='{software}'")
-            try:
-                # Blocks
-                federation = fba.get_response(blocker, f"{fba.get_peers_url}?filter=suspended", fba.api_headers, (config.get("connection_timeout"), config.get("read_timeout"))).json()
-
-                if (federation == None):
-                    print("WARNING: No valid response:", blocker);
-                elif "error" in federation:
-                    print("WARNING: API returned error:", federation["error"])
-                else:
-                    print(f"INFO: Checking {len(federation)} entries from blocker='{blocker}',software='{software}' ...")
-                    for peer in federation:
-                        blocked = peer["domain"].lower()
-                        # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
-                        blocked = fba.tidyup_domain(blocked)
-                        # DEBUG: print("DEBUG: AFTER blocked:", blocked)
-
-                        if blocked == "":
-                            print("WARNING: blocked is empty:", blocker)
-                            continue
-                        elif blacklist.is_blacklisted(blocked):
-                            # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
-                            continue
-                        elif blocked.count("*") > 0:
-                            # GTS does not have hashes for obscured domains, so we have to guess it
-                            fba.cursor.execute(
-                                "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
-                            )
-                            searchres = fba.cursor.fetchone()
-
-                            if searchres == None:
-                                print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
-                                continue
-
-                            blocked = searchres[0]
-                            origin = searchres[1]
-                            nodeinfo_url = searchres[2]
-                        elif not validators.domain(blocked):
-                            print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
-                            continue
-
-                        # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
-                        if not validators.domain(blocked):
-                            print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
-                            continue
-                        elif not fba.is_instance_registered(blocked):
-                            # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
-                            fba.add_instance(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
-
-                        if not blocks.is_instance_blocked(blocker, blocked, "reject"):
-                            # DEBUG: print(f"DEBUG: blocker='{blocker}' is blocking '{blocked}' for unknown reason at this point")
-                            blocks.add_instance(blocker, blocked, "unknown", "reject")
-
-                            blockdict.append({
-                                "blocked": blocked,
-                                "reason" : None
-                            })
-                        else:
-                            # DEBUG: print(f"DEBUG: Updating block last seen for blocker='{blocker}',blocked='{blocked}' ...")
-                            blocks.update_last_seen(blocker, blocked, "reject")
-
-                        if "public_comment" in peer:
-                            # DEBUG: print("DEBUG: Updating block reason:", blocker, blocked, peer["public_comment"])
-                            blocks.update_reason(peer["public_comment"], blocker, blocked, "reject")
-
-                            for entry in blockdict:
-                                if entry["blocked"] == blocked:
-                                    # DEBUG: print(f"DEBUG: Setting block reason for blocked='{blocked}':'{peer['public_comment']}'")
-                                    entry["reason"] = peer["public_comment"]
-
-                    # DEBUG: print("DEBUG: Committing changes ...")
-                    fba.connection.commit()
-            except Exception as e:
-                print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(e)}]:'{str(e)}'")
+            gotosocial.fetch_blocks(blocker, origin, nodeinfo_url)
         else:
             print("WARNING: Unknown software:", blocker, software)
 
index e6b5539f3d28367fdd26b0412429da81042eb723..95176a9112bbaa5d91e3a5e4a6b3e8b305ef16e1 100644 (file)
@@ -886,7 +886,7 @@ def fetch_friendica_blocks(domain: str) -> dict:
         raise ValueError(f"Parameter 'domain' is empty")
 
     # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
-    blocklist = []
+    blocked = list()
 
     try:
         doc = bs4.BeautifulSoup(
@@ -905,16 +905,26 @@ def fetch_friendica_blocks(domain: str) -> dict:
         # DEBUG: print("DEBUG: Instance has no block list:", domain)
         return {}
 
-    for line in blocklist.find("table").find_all("tr")[1:]:
+    table = blocklist.find("table")
+
+    # DEBUG: print(f"DEBUG: table[]='{type(table)}'")
+    if table.find("tbody"):
+        rows = table.find("tbody").find_all("tr")
+    else:
+        rows = table.find_all("tr")
+
+    # DEBUG: print(f"DEBUG: Found rows()={len(rows)}")
+    for line in rows:
         # DEBUG: print(f"DEBUG: line='{line}'")
-        blocklist.append({
+        blocked.append({
             "domain": tidyup_domain(line.find_all("td")[0].text),
-            "reason": tidyup_domain(line.find_all("td")[1].text)
+            "reason": tidyup_reason(line.find_all("td")[1].text)
         })
+        # DEBUG: print("DEBUG: Next!")
 
     # DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist))
     return {
-        "reject": blocklist
+        "reject": blocked
     }
 
 def fetch_misskey_blocks(domain: str) -> dict:
@@ -1072,7 +1082,7 @@ def tidyup_reason(reason: str) -> str:
     # Replace â with "
     reason = re.sub("â", "\"", reason)
 
-    ## DEBUG: print(f"DEBUG: reason='{reason}' - EXIT!")
+    # DEBUG: print(f"DEBUG: reason='{reason}' - EXIT!")
     return reason
 
 def tidyup_domain(domain: str) -> str:
index 31e599293a7734999a1e376a412d7d647d75286d..a2fdf73b16ac22b879dbf36a2be2afd584e4ca1d 100644 (file)
@@ -1,4 +1,5 @@
 __all__ = [
+    'gotosocial',
     'lemmy',
     'mastodon',
     'misskey',
diff --git a/fba/federation/gotosocial.py b/fba/federation/gotosocial.py
new file mode 100644 (file)
index 0000000..606b091
--- /dev/null
@@ -0,0 +1,114 @@
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+import inspect
+import validators
+
+from fba import blacklist
+from fba import blocks
+from fba import config
+from fba import fba
+
+def fetch_blocks(blocker: str, origin: str, nodeinfo_url: str):
+    print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
+    if type(domain) != str:
+        raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
+    elif domain == "":
+        raise ValueError(f"Parameter 'domain' is empty")
+    elif type(origin) != str and origin != None:
+        raise ValueError(f"Parameter origin[]={type(origin)} is not 'str'")
+    elif origin == "":
+        raise ValueError(f"Parameter 'origin' is empty")
+    elif type(nodeinfo_url) != str:
+        raise ValueError(f"Parameter nodeinfo_url[]={type(nodeinfo_url)} is not 'str'")
+    elif nodeinfo_url == "":
+        raise ValueError(f"Parameter 'nodeinfo_url' is empty")
+
+    try:
+        # Blocks
+        federation = fba.get_response(blocker, f"{fba.get_peers_url}?filter=suspended", fba.api_headers, (config.get("connection_timeout"), config.get("read_timeout"))).json()
+
+        if (federation == None):
+            print("WARNING: No valid response:", blocker);
+        elif "error" in federation:
+            print("WARNING: API returned error:", federation["error"])
+        else:
+            print(f"INFO: Checking {len(federation)} entries from blocker='{blocker}',software='{software}' ...")
+            for peer in federation:
+                blocked = peer["domain"].lower()
+                # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
+                blocked = fba.tidyup_domain(blocked)
+                # DEBUG: print("DEBUG: AFTER blocked:", blocked)
+
+                if blocked == "":
+                    print("WARNING: blocked is empty:", blocker)
+                    continue
+                elif blacklist.is_blacklisted(blocked):
+                    # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
+                    continue
+                elif blocked.count("*") > 0:
+                    # GTS does not have hashes for obscured domains, so we have to guess it
+                    fba.cursor.execute(
+                        "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
+                    )
+                    searchres = fba.cursor.fetchone()
+
+                    if searchres == None:
+                        print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
+                        continue
+
+                    blocked = searchres[0]
+                    origin = searchres[1]
+                    nodeinfo_url = searchres[2]
+                elif not validators.domain(blocked):
+                    print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
+                    continue
+
+                # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+                if not validators.domain(blocked):
+                    print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
+                    continue
+                elif not fba.is_instance_registered(blocked):
+                    # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+                    fba.add_instance(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
+
+                if not blocks.is_instance_blocked(blocker, blocked, "reject"):
+                    # DEBUG: print(f"DEBUG: blocker='{blocker}' is blocking '{blocked}' for unknown reason at this point")
+                    blocks.add_instance(blocker, blocked, "unknown", "reject")
+
+                    blockdict.append({
+                        "blocked": blocked,
+                        "reason" : None
+                    })
+                else:
+                    # DEBUG: print(f"DEBUG: Updating block last seen for blocker='{blocker}',blocked='{blocked}' ...")
+                    blocks.update_last_seen(blocker, blocked, "reject")
+
+                if "public_comment" in peer:
+                    # DEBUG: print("DEBUG: Updating block reason:", blocker, blocked, peer["public_comment"])
+                    blocks.update_reason(peer["public_comment"], blocker, blocked, "reject")
+
+                    for entry in blockdict:
+                        if entry["blocked"] == blocked:
+                            # DEBUG: print(f"DEBUG: Setting block reason for blocked='{blocked}':'{peer['public_comment']}'")
+                            entry["reason"] = peer["public_comment"]
+
+            # DEBUG: print("DEBUG: Committing changes ...")
+            fba.connection.commit()
+    except Exception as e:
+        print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(e)}]:'{str(e)}'")
+
+    # DEBUG: print("DEBUG: EXIT!")