]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Mon, 12 Jun 2023 09:22:57 +0000 (11:22 +0200)
committerRoland Häder <roland@mxchange.org>
Mon, 12 Jun 2023 09:25:39 +0000 (11:25 +0200)
- introduced instances.deobsure() for simple deobscuring attempts, e.g. when
  they use * or ? when they don't want to show the domain, you can still
  deobsfucate it with the hash (some provide it) or obscured domain itself

fba/commands.py
fba/instances.py
fba/networks/mastodon.py
fba/networks/pleroma.py
templates/views/scoreboard.html

index 74656c48712575d7abf0de189bf10b0329ace70e..534c861d0f2dd2090cb42ba4cf1e422998c1e3f0 100644 (file)
@@ -210,39 +210,28 @@ def fetch_blocks(args: argparse.Namespace):
                         continue
                     elif blocked.count("*") > 0:
                         # Some friendica servers also obscure domains without hash
-                        fba.cursor.execute(
-                            "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
-                        )
+                        row = instances.deobsure("*", blocked)
 
-                        searchres = fba.cursor.fetchone()
-
-                        # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
-                        if searchres is None:
+                        # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+                        if row is None:
                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
                             continue
 
-                        blocked      = searchres[0]
-                        origin       = searchres[1]
-                        nodeinfo_url = searchres[2]
+                        blocked      = row[0]
+                        origin       = row[1]
+                        nodeinfo_url = row[2]
                     elif blocked.count("?") > 0:
                         # Some obscure them with question marks, not sure if that's dependent on version or not
-                        fba.cursor.execute(
-                            "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("?", "_")]
-                        )
-
-                        searchres = fba.cursor.fetchone()
+                        row = instances.deobsure("?", blocked)
 
-                        # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
-                        if searchres is None:
+                        # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+                        if row is None:
                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
                             continue
 
-                        blocked      = searchres[0]
-                        origin       = searchres[1]
-                        nodeinfo_url = searchres[2]
-                    elif not validators.domain(blocked):
-                        print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
-                        continue
+                        blocked      = row[0]
+                        origin       = row[1]
+                        nodeinfo_url = row[2]
 
                     # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
                     if not validators.domain(blocked):
@@ -554,21 +543,42 @@ def fetch_oliphant(args: argparse.Namespace):
 
             # DEBUG: print(f"DEBUG: reader[]='{type(reader)}'")
             for row in reader:
-                if not validators.domain(row["#domain"]):
-                    print(f"WARNING: domain='{row['#domain']}' is not a valid domain - skipped!")
+                domain = None
+                if "#domain" in row:
+                    domain = row["#domain"]
+                elif "domain" in row:
+                    domain = row["domain"]
+                else:
+                    print(f"DEBUG: row='{row}' does not contain domain column")
+                    continue
+
+                if domain.find("*") > 0:
+                   # Try to de-obsure it
+                   row = instances.deobsure("*", domain)
+
+                   # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
+                   if row is None:
+                       print(f"WARNING: Cannot de-obfucate domain='{domain}' - skipped!")
+                       continue
+
+                   # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
+                   domain = row[0]
+
+                if not validators.domain(domain):
+                    print(f"WARNING: domain='{domain}' is not a valid domain - skipped!")
                     continue
-                elif blacklist.is_blacklisted(row["#domain"]):
-                    print(f"WARNING: domain='{row['#domain']}' is blacklisted - skipped!")
+                elif blacklist.is_blacklisted(domain):
+                    print(f"WARNING: domain='{domain}' is blacklisted - skipped!")
                     continue
-                elif instances.is_recent(row["#domain"]):
-                    # DEBUG: print(f"DEBUG: domain='{row['#domain']}' has been recently checked - skipped!")
+                elif instances.is_recent(domain):
+                    # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - skipped!")
                     continue
 
                 try:
-                    print(f"INFO: Fetching instances for instane='{row['#domain']}' ...")
-                    federation.fetch_instances(row["#domain"], block["blocker"], None, inspect.currentframe().f_code.co_name)
+                    print(f"INFO: Fetching instances for instane='{domain}' ...")
+                    federation.fetch_instances(domain, block["blocker"], None, inspect.currentframe().f_code.co_name)
                 except network.exceptions as exception:
-                    print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{row['#domain']}'")
-                    instances.update_last_error(row["#domain"], exception)
+                    print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
+                    instances.update_last_error(domain, exception)
 
     # DEBUG: print("DEBUG: EXIT!")
index 5b5046f68341423fb20700a7c9689e3a62eec026..ec1dfb75d0083f4315528dd7e1cd0e9452a288e5 100644 (file)
@@ -336,3 +336,30 @@ def is_recent(domain: str) -> bool:
 
     # DEBUG: print(f"DEBUG: recently='{recently}' - EXIT!")
     return recently
+
+def deobsure(char: str, domain: str, hash: str = None) -> tuple:
+    #print(f"DEBUG: char='{char}',domain='{domain}',hash='{hash}' - CALLED!")
+    if not isinstance(char, str):
+        raise ValueError(f"Parameter char[]='{type(char)}' is not 'str'")
+    elif char == "":
+        raise ValueError("Parameter 'char' is empty")
+    elif not isinstance(domain, str):
+        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+    elif domain == "":
+        raise ValueError("Parameter 'domain' is empty")
+    elif not isinstance(hash, str) and hash is not None:
+        raise ValueError(f"Parameter hash[]='{type(hash)}' is not 'str'")
+
+    if isinstance(hash, str):
+        fba.cursor.execute(
+            "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? LIMIT 1", [hash]
+        )
+    else:
+        fba.cursor.execute(
+            "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [domain.replace(char, "_")]
+        )
+
+    row = fba.cursor.fetchone()
+
+    #print(f"DEBUG: row[]='{type(row)}' - EXIT!")
+    return row
index 760e5e6e7e6da8c22e462d6d542e67c84b660848..ae5ea0ce26e45e3f6ca8d81457e8ea040a9ed8bc 100644 (file)
@@ -242,20 +242,17 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
                     continue
                 elif blocked.count("*") > 0:
                     # Doing the hash search for instance names as well to tidy up DB
-                    fba.cursor.execute(
-                        "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? LIMIT 1", [blocked_hash]
-                    )
-                    searchres = fba.cursor.fetchone()
+                    row = instances.deobsure("*", blocked, blocked_hash)
 
-                    # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
-                    if searchres is None:
+                    # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+                    if row is None:
                         print(f"WARNING: Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
                         continue
 
-                    # DEBUG: print("DEBUG: Updating domain: ", searchres[0])
-                    blocked      = searchres[0]
-                    origin       = searchres[1]
-                    nodeinfo_url = searchres[2]
+                    # DEBUG: print("DEBUG: Updating domain: ", row[0])
+                    blocked      = row[0]
+                    origin       = row[1]
+                    nodeinfo_url = row[2]
 
                     # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
                     if not validators.domain(blocked):
index 36f85350978fb1f08cd12c6b108d668f9658869e..f9701473c27530f0ae47d25412bb69aaa7709af7 100644 (file)
@@ -97,20 +97,17 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
                         continue
                     elif blocked.count("*") > 0:
                         # Obsured domain name with no hash
-                        # DEBUG: print(f"DEBUG: Trying to de-obscure blocked='{blocked}' ...")
-                        fba.cursor.execute(
-                            "SELECT domain, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
-                        )
-                        searchres = fba.cursor.fetchone()
-
-                        # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
-                        if searchres is None:
+                        row = instances.deobsure("*", blocked)
+
+                        # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+                        if row is None:
                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
                             continue
 
-                        # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{searchres[0]}'")
-                        blocked      = searchres[0]
-                        nodeinfo_url = searchres[1]
+                        # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
+                        blocked      = row[0]
+                        origin       = row[1]
+                        nodeinfo_url = row[2]
 
                     # DEBUG: print(f"DEBUG: blocked='{blocked}'")
                     if not validators.domain(blocked):
@@ -183,28 +180,24 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
                     # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
                     continue
                 elif blocked.count("*") > 0:
-                    # Obsured domain with no hash
-                    # DEBUG: print(f"DEBUG: Trying to de-obscure blocked='{blocked}' ...")
-                    fba.cursor.execute(
-                        "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
-                    )
-                    searchres = fba.cursor.fetchone()
-
-                    # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
-                    if searchres is None:
+                    # Obsured domain name with no hash
+                    row = instances.deobsure("*", blocked)
+
+                    # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+                    if row is None:
                         print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
                         continue
 
-                    # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{searchres[0]}'")
-                    blocked = searchres[0]
-                    origin = searchres[1]
-                    nodeinfo_url = searchres[2]
-                elif not validators.domain(blocked):
-                    print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
-                    continue
+                    # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
+                    blocked      = row[0]
+                    origin       = row[1]
+                    nodeinfo_url = row[2]
 
                 # DEBUG: print(f"DEBUG: blocked='{blocked}'")
-                if blocked.split(".")[-1] == "arpa":
+                if not validators.domain(blocked):
+                    print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
+                    continue
+                elif blocked.split(".")[-1] == "arpa":
                     print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
                     continue
                 elif not instances.is_registered(blocked):
index 5b893c0f38ceeab89c82a378ab9ca845cc71e42c..dbbba79a5cc62ede7a1113105bedddc02affd137 100644 (file)
@@ -1,6 +1,6 @@
 {% extends "base.html" %}
 
-{% block title %}Scoreboard - {% if mode == 'software' %}TOP {{amount}} used software{% elif mode == 'command' %}TOP {{amount}} scripts{% elif mode == 'error_code' %}TOP {{amount}} error codes{% elif mode == 'reference' %}TOP {{amount}} referencing instances{% elif mode == 'blocked' %}TOP {{amount}} deferated instances{% elif mode == 'blocker' %}TOP {{amount}} deferating instances{% endif %}{% endblock %}
+{% block title %}Scoreboard - {% if mode == 'software' %}TOP {{amount}} used software{% elif mode == 'command' %}TOP {{amount}} commands{% elif mode == 'error_code' %}TOP {{amount}} error codes{% elif mode == 'reference' %}TOP {{amount}} referencing instances{% elif mode == 'blocked' %}TOP {{amount}} deferated instances{% elif mode == 'blocker' %}TOP {{amount}} deferating instances{% endif %}{% endblock %}
 
 {% block header %}
     {% if mode == 'blocker' %}
@@ -12,7 +12,7 @@
     {% elif mode == 'software' %}
         <h1>Top {{amount}} used software</h1>
     {% elif mode == 'command' %}
-        <h1>TOP {{amount}} scripts</h1>
+        <h1>TOP {{amount}} commands</h1>
     {% elif mode == 'error_code' %}
         <h1>TOP {{amount}} error codes</h1>
     {% endif %}