]> git.mxchange.org Git - fba.git/commitdiff
WIP:
authorRoland Häder <roland@mxchange.org>
Wed, 17 May 2023 05:49:20 +0000 (07:49 +0200)
committerRoland Häder <roland@mxchange.org>
Wed, 17 May 2023 05:49:20 +0000 (07:49 +0200)
- moved functions to fba.py module

fba.py [new file with mode: 0644]
fetch_blocks.py
fetch_instances.py

diff --git a/fba.py b/fba.py
new file mode 100644 (file)
index 0000000..c93996d
--- /dev/null
+++ b/fba.py
@@ -0,0 +1,314 @@
+from reqto import get
+from reqto import post
+from bs4 import BeautifulSoup
+from reqto import get
+from hashlib import sha256
+import sqlite3
+import json
+import sys
+
+with open("config.json") as f:
+    config = json.loads(f.read())
+
+blacklist = [
+    "activitypub-troll.cf",
+    "gab.best",
+    "4chan.icu",
+    "social.shrimpcam.pw",
+    "mastotroll.netz.org"
+]
+
+headers = {
+    "user-agent": config["useragent"]
+}
+
+conn = sqlite3.connect("blocks.db")
+c = conn.cursor()
+
+def get_hash(domain: str) -> str:
+    return sha256(domain.encode("utf-8")).hexdigest()
+
+def get_peers(domain: str) -> str:
+    try:
+        res = get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
+        return res.json()
+    except:
+        print("WARNING: Cannot fetch peers:", domain)
+        return None
+
+def get_type(instdomain: str) -> str:
+    try:
+        res = get(f"https://{instdomain}/nodeinfo/2.1.json", headers=headers, timeout=5)
+        if res.status_code == 404:
+            res = get(f"https://{instdomain}/nodeinfo/2.0", headers=headers, timeout=5)
+        if res.status_code == 404:
+            res = get(f"https://{instdomain}/nodeinfo/2.0.json", headers=headers, timeout=5)
+        if res.ok and "text/html" in res.headers["content-type"]:
+            res = get(f"https://{instdomain}/nodeinfo/2.1", headers=headers, timeout=5)
+        if res.ok:
+            if res.json()["software"]["name"] in ["akkoma", "rebased"]:
+                return "pleroma"
+            elif res.json()["software"]["name"] in ["hometown", "ecko"]:
+                return "mastodon"
+            elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
+                return "misskey"
+            else:
+                return res.json()["software"]["name"]
+        elif res.status_code == 404:
+            res = get(f"https://{instdomain}/api/v1/instance", headers=headers, timeout=5)
+        if res.ok:
+            return "mastodon"
+    except:
+        return None
+
+def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
+    # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
+    try:
+        c.execute(
+            "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
+            (
+                reason,
+                blocker,
+                blocked,
+                block_level
+            ),
+        )
+    except:
+        print("ERROR: failed SQL query")
+        sys.exit(255)
+
+def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
+    # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
+    try:
+        c.execute(
+            "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
+            (
+                last_seen,
+                blocker,
+                blocked,
+                block_level
+            )
+        )
+    except:
+        print("ERROR: failed SQL query")
+        sys.exit(255)
+
+def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
+    print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
+    try:
+        c.execute(
+            "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
+             (
+                 blocker,
+                 blocked,
+                 reason,
+                 block_level,
+                 first_added,
+                 last_seen
+             ),
+        )
+    except:
+        print("ERROR: failed SQL query")
+        sys.exit(255)
+
+def add_instance(domain: str):
+    print("--- Adding new instance:", domain)
+    try:
+        c.execute(
+            "INSERT INTO instances SELECT ?, ?, ?",
+            (
+               domain,
+               get_hash(domain),
+               get_type(domain)
+            ),
+        )
+    except:
+        print("ERROR: failed SQL query")
+        sys.exit(255)
+
+def send_bot_post(instance: str, blocks: dict):
+    message = instance + " has blocked the following instances:\n\n"
+    truncated = False
+
+    if len(blocks) > 20:
+        truncated = True
+        blocks = blocks[0 : 19]
+
+    for block in blocks:
+        if block["reason"] == None or block["reason"] == '':
+            message = message + block["blocked"] + " with unspecified reason\n"
+        else:
+            if len(block["reason"]) > 420:
+                block["reason"] = block["reason"][0:419] + "[…]"
+            message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
+    if truncated:
+        message = message + "(the list has been truncated to the first 20 entries)"
+
+    botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
+    req = post(f"{config['bot_instance']}/api/v1/statuses",
+        data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
+        headers=botheaders, timeout=10).json()
+    return True
+
+def get_mastodon_blocks(domain: str) -> dict:
+    blocks = {
+        "Suspended servers": [],
+        "Filtered media": [],
+        "Limited servers": [],
+        "Silenced servers": [],
+    }
+
+    translations = {
+        "Silenced instances": "Silenced servers",
+        "Suspended instances": "Suspended servers",
+        "Gesperrte Server": "Suspended servers",
+        "Gefilterte Medien": "Filtered media",
+        "Stummgeschaltete Server": "Silenced servers",
+        "停止済みのサーバー": "Suspended servers",
+        "メディアを拒否しているサーバー": "Filtered media",
+        "サイレンス済みのサーバー": "Silenced servers",
+        "שרתים מושעים": "Suspended servers",
+        "מדיה מסוננת": "Filtered media",
+        "שרתים מוגבלים": "Silenced servers",
+        "Serveurs suspendus": "Suspended servers",
+        "Médias filtrés": "Filtered media",
+        "Serveurs limités": "Silenced servers",
+    }
+
+    try:
+        doc = BeautifulSoup(
+            get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
+            "html.parser",
+        )
+    except:
+        print("ERROR: Cannot fetch from domain:", domain)
+        return {}
+
+    for header in doc.find_all("h3"):
+        header_text = header.text
+        if header_text in translations:
+            header_text = translations[header_text]
+        if header_text in blocks:
+            # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
+            for line in header.find_all_next("table")[0].find_all("tr")[1:]:
+                blocks[header_text].append(
+                    {
+                        "domain": line.find("span").text,
+                        "hash": line.find("span")["title"][9:],
+                        "reason": line.find_all("td")[1].text.strip(),
+                    }
+                )
+    return {
+        "reject": blocks["Suspended servers"],
+        "media_removal": blocks["Filtered media"],
+        "followers_only": blocks["Limited servers"]
+        + blocks["Silenced servers"],
+    }
+
+def get_friendica_blocks(domain: str) -> dict:
+    blocks = []
+
+    try:
+        doc = BeautifulSoup(
+            get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
+            "html.parser",
+        )
+    except:
+        return {}
+
+    blocklist = doc.find(id="about_blocklist")
+
+    # Prevents exceptions:
+    if blocklist is None:
+        print("Instance has no block list:", domain)
+        return {}
+
+    for line in blocklist.find("table").find_all("tr")[1:]:
+        blocks.append(
+            {
+                "domain": line.find_all("td")[0].text.strip(),
+                "reason": line.find_all("td")[1].text.strip()
+            }
+        )
+
+    return {
+        "reject": blocks
+    }
+
+def get_misskey_blocks(domain: str) -> dict:
+    blocks = {
+        "suspended": [],
+        "blocked": []
+    }
+
+    try:
+        counter = 0
+        step = 99
+        while True:
+            # iterating through all "suspended" (follow-only in its terminology) instances page-by-page, since that troonware doesn't support sending them all at once
+            try:
+                if counter == 0:
+                    doc = post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step}), headers=headers, timeout=5).json()
+                    if doc == []: raise
+                else:
+                    doc = post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
+                    if doc == []: raise
+                for instance in doc:
+                    # just in case
+                    if instance["isSuspended"]:
+                        blocks["suspended"].append(
+                            {
+                                "domain": instance["host"],
+                                # no reason field, nothing
+                                "reason": ""
+                            }
+                        )
+                counter = counter + step
+            except:
+                counter = 0
+                break
+
+        while True:
+            # same shit, different asshole ("blocked" aka full suspend)
+            try:
+                if counter == 0:
+                    doc = post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step}), headers=headers, timeout=5).json()
+                    if doc == []: raise
+                else:
+                    doc = post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
+                    if doc == []: raise
+                for instance in doc:
+                    if instance["isBlocked"]:
+                        blocks["blocked"].append(
+                            {
+                                "domain": instance["host"],
+                                "reason": ""
+                            }
+                        )
+                counter = counter + step
+            except:
+                counter = 0
+                break
+
+        return {
+            "reject": blocks["blocked"],
+            "followers_only": blocks["suspended"]
+        }
+
+    except:
+        return {}
+
+def tidyup(domain: str) -> str:
+    # some retards put their blocks in variable case
+    domain = domain.lower()
+    # other retards put the port
+    domain = re.sub("\:\d+$", "", domain)
+    # bigger retards put the schema in their blocklist, sometimes even without slashes
+    domain = re.sub("^https?\:(\/*)", "", domain)
+    # and trailing slash
+    domain = re.sub("\/$", "", domain)
+    # and the @
+    domain = re.sub("^\@", "", domain)
+    # the biggest retards of them all try to block individual users
+    domain = re.sub("(.+)\@", "", domain)
+    return domain
index 58f86599f694b050381666b8032797c7a778da37..b5cb55d7ed806be959201d67e74fcc361c4c2620 100644 (file)
 from reqto import get
 from reqto import post
 from hashlib import sha256
 from reqto import get
 from reqto import post
 from hashlib import sha256
-import sqlite3
 from bs4 import BeautifulSoup
 from json import dumps
 from json import loads
 import re
 from time import time
 import itertools
 from bs4 import BeautifulSoup
 from json import dumps
 from json import loads
 import re
 from time import time
 import itertools
-
-with open("config.json") as f:
-    config = loads(f.read())
-
-headers = {
-    "user-agent": config["useragent"]
-}
-
-def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
-    # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
-    c.execute(
-        "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
-        (
-            reason,
-            blocker,
-            blocked,
-            block_level
-        ),
-    )
-
-def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
-    # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
-    c.execute(
-        "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
-        (
-            last_seen,
-            blocker,
-            blocked,
-            block_level
-        )
-    )
-
-def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
-    print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
-    c.execute(
-        "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
-         (
-             blocker,
-             blocked,
-             reason,
-             block_level,
-             first_added,
-             last_seen
-         ),
-    )
-
-def add_instance (domain: str):
-    print("--- Adding new instance:", domain)
-    c.execute(
-        "INSERT INTO instances SELECT ?, ?, ?",
-        (
-           blocked,
-           get_hash(blocked),
-           get_type(blocked)
-        ),
-    )
-
-def send_bot_post(instance: str, blocks: dict):
-    message = instance + " has blocked the following instances:\n\n"
-    truncated = False
-    if len(blocks) > 20:
-        truncated = True
-        blocks = blocks[0 : 19]
-    for block in blocks:
-        if block["reason"] == None or block["reason"] == '':
-            message = message + block["blocked"] + " with unspecified reason\n"
-        else:
-            if len(block["reason"]) > 420:
-                block["reason"] = block["reason"][0:419] + "[…]"
-            message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
-    if truncated:
-        message = message + "(the list has been truncated to the first 20 entries)"
-
-    botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
-    req = post(f"{config['bot_instance']}/api/v1/statuses",
-        data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
-        headers=botheaders, timeout=10).json()
-    return True
-
-def get_mastodon_blocks(domain: str) -> dict:
-    blocks = {
-        "Suspended servers": [],
-        "Filtered media": [],
-        "Limited servers": [],
-        "Silenced servers": [],
-    }
-
-    translations = {
-        "Silenced instances": "Silenced servers",
-        "Suspended instances": "Suspended servers",
-        "Gesperrte Server": "Suspended servers",
-        "Gefilterte Medien": "Filtered media",
-        "Stummgeschaltete Server": "Silenced servers",
-        "停止済みのサーバー": "Suspended servers",
-        "メディアを拒否しているサーバー": "Filtered media",
-        "サイレンス済みのサーバー": "Silenced servers",
-        "שרתים מושעים": "Suspended servers",
-        "מדיה מסוננת": "Filtered media",
-        "שרתים מוגבלים": "Silenced servers",
-        "Serveurs suspendus": "Suspended servers",
-        "Médias filtrés": "Filtered media",
-        "Serveurs limités": "Silenced servers",
-    }
-
-    try:
-        doc = BeautifulSoup(
-            get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
-            "html.parser",
-        )
-    except:
-        return {}
-
-    for header in doc.find_all("h3"):
-        header_text = header.text
-        if header_text in translations:
-            header_text = translations[header_text]
-        if header_text in blocks:
-            # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
-            for line in header.find_all_next("table")[0].find_all("tr")[1:]:
-                blocks[header_text].append(
-                    {
-                        "domain": line.find("span").text,
-                        "hash": line.find("span")["title"][9:],
-                        "reason": line.find_all("td")[1].text.strip(),
-                    }
-                )
-    return {
-        "reject": blocks["Suspended servers"],
-        "media_removal": blocks["Filtered media"],
-        "followers_only": blocks["Limited servers"]
-        + blocks["Silenced servers"],
-    }
-
-def get_friendica_blocks(domain: str) -> dict:
-    blocks = []
-
-    try:
-        doc = BeautifulSoup(
-            get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
-            "html.parser",
-        )
-    except:
-        return {}
-
-    blocklist = doc.find(id="about_blocklist")
-
-    # Prevents exceptions:
-    if blocklist is None:
-        print("Instance has no block list:", domain)
-        return {}
-
-    for line in blocklist.find("table").find_all("tr")[1:]:
-        blocks.append(
-            {
-                "domain": line.find_all("td")[0].text.strip(),
-                "reason": line.find_all("td")[1].text.strip()
-            }
-        )
-
-    return {
-        "reject": blocks
-    }
-
-def get_misskey_blocks(domain: str) -> dict:
-    blocks = {
-        "suspended": [],
-        "blocked": []
-    }
-
-    try:
-        counter = 0
-        step = 99
-        while True:
-            # iterating through all "suspended" (follow-only in its terminology) instances page-by-page, since that troonware doesn't support sending them all at once
-            try:
-                if counter == 0:
-                    doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step}), headers=headers, timeout=5).json()
-                    if doc == []: raise
-                else:
-                    doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
-                    if doc == []: raise
-                for instance in doc:
-                    # just in case
-                    if instance["isSuspended"]:
-                        blocks["suspended"].append(
-                            {
-                                "domain": instance["host"],
-                                # no reason field, nothing
-                                "reason": ""
-                            }
-                        )
-                counter = counter + step
-            except:
-                counter = 0
-                break
-
-        while True:
-            # same shit, different asshole ("blocked" aka full suspend)
-            try:
-                if counter == 0:
-                    doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step}), headers=headers, timeout=5).json()
-                    if doc == []: raise
-                else:
-                    doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
-                    if doc == []: raise
-                for instance in doc:
-                    if instance["isBlocked"]:
-                        blocks["blocked"].append(
-                            {
-                                "domain": instance["host"],
-                                "reason": ""
-                            }
-                        )
-                counter = counter + step
-            except:
-                counter = 0
-                break
-
-        return {
-            "reject": blocks["blocked"],
-            "followers_only": blocks["suspended"]
-        }
-
-    except:
-        return {}
-
-def get_hash(domain: str) -> str:
-    return sha256(domain.encode("utf-8")).hexdigest()
-
-
-def get_type(domain: str) -> str:
-    try:
-        res = get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
-        if res.status_code == 404:
-            res = get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
-        if res.status_code == 404:
-            res = get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
-        if res.ok and "text/html" in res.headers["content-type"]:
-            res = get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
-        if res.ok:
-            if res.json()["software"]["name"] in ["akkoma", "rebased"]:
-                return "pleroma"
-            elif res.json()["software"]["name"] in ["hometown", "ecko"]:
-                return "mastodon"
-            elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
-                return "misskey"
-            else:
-                return res.json()["software"]["name"]
-        elif res.status_code == 404:
-            res = get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
-        if res.ok:
-            return "mastodon"
-    except:
-        return None
-
-def tidyup(domain: str) -> str:
-    # some retards put their blocks in variable case
-    domain = domain.lower()
-    # other retards put the port
-    domain = re.sub("\:\d+$", "", domain)
-    # bigger retards put the schema in their blocklist, sometimes even without slashes
-    domain = re.sub("^https?\:(\/*)", "", domain)
-    # and trailing slash
-    domain = re.sub("\/$", "", domain)
-    # and the @
-    domain = re.sub("^\@", "", domain)
-    # the biggest retards of them all try to block individual users
-    domain = re.sub("(.+)\@", "", domain)
-    return domain
-
-conn = sqlite3.connect("blocks.db")
-c = conn.cursor()
+from fba import c
+import fba
 
 c.execute(
     "SELECT domain, software FROM instances WHERE domain='tooting.intensifi.es'"
 
 c.execute(
     "SELECT domain, software FROM instances WHERE domain='tooting.intensifi.es'"
@@ -288,7 +17,7 @@ c.execute(
 
 for blocker, software in c.fetchall():
     blockdict = []
 
 for blocker, software in c.fetchall():
     blockdict = []
-    blocker = tidyup(blocker)
+    blocker = fba.tidyup(blocker)
     if software == "pleroma":
         print(blocker)
         try:
     if software == "pleroma":
         print(blocker)
         try:
@@ -302,10 +31,10 @@ for blocker, software in c.fetchall():
                     **{"quarantined_instances": federation["quarantined_instances"]}}
                 ).items():
                     for blocked in blocks:
                     **{"quarantined_instances": federation["quarantined_instances"]}}
                 ).items():
                     for blocked in blocks:
-                        blocked = tidyup(blocked)
+                        blocked = fba.tidyup(blocked)
 
                         if blocked == "":
 
                         if blocked == "":
-                            print("WARNING: blocked is empty after tidyup():", blocker, block_level)
+                            print("WARNING: blocked is empty after fba.tidyup():", blocker, block_level)
                             continue
 
                         if blocked.count("*") > 1:
                             continue
 
                         if blocked.count("*") > 1:
@@ -344,7 +73,7 @@ for blocker, software in c.fetchall():
                                     })
                         else:
                             update_last_seen(timestamp, blocker, blocked, block_level)
                                     })
                         else:
                             update_last_seen(timestamp, blocker, blocked, block_level)
-            conn.commit()
+            fba.conn.commit()
             # Reasons
             if "mrf_simple_info" in federation:
                 for block_level, info in (
             # Reasons
             if "mrf_simple_info" in federation:
                 for block_level, info in (
@@ -354,10 +83,10 @@ for blocker, software in c.fetchall():
                     else {})}
                 ).items():
                     for blocked, reason in info.items():
                     else {})}
                 ).items():
                     for blocked, reason in info.items():
-                        blocked = tidyup(blocked)
+                        blocked = fba.tidyup(blocked)
 
                         if blocked == "":
 
                         if blocked == "":
-                            print("WARNING: blocked is empty after tidyup():", blocker, block_level)
+                            print("WARNING: blocked is empty after fba.tidyup():", blocker, block_level)
                             continue
 
                         if blocked.count("*") > 1:
                             continue
 
                         if blocked.count("*") > 1:
@@ -376,7 +105,7 @@ for blocker, software in c.fetchall():
                             if entry["blocked"] == blocked:
                                 entry["reason"] = reason["reason"]
 
                             if entry["blocked"] == blocked:
                                 entry["reason"] = reason["reason"]
 
-            conn.commit()
+            fba.conn.commit()
         except Exception as e:
             print("error:", e, blocker)
     elif software == "mastodon":
         except Exception as e:
             print("error:", e, blocker)
     elif software == "mastodon":
@@ -419,12 +148,12 @@ for blocker, software in c.fetchall():
                     else:
                         print("WARNING: Unknown severity:", block['severity'], block['domain'])
             except:
                     else:
                         print("WARNING: Unknown severity:", block['severity'], block['domain'])
             except:
-                json = get_mastodon_blocks(blocker)
+                json = fba.get_mastodon_blocks(blocker)
 
             for block_level, blocks in json.items():
                 for instance in blocks:
                     blocked, blocked_hash, reason = instance.values()
 
             for block_level, blocks in json.items():
                 for instance in blocks:
                     blocked, blocked_hash, reason = instance.values()
-                    blocked = tidyup(blocked)
+                    blocked = fba.tidyup(blocked)
 
                     if blocked.count("*") <= 1:
                         c.execute(
 
                     if blocked.count("*") <= 1:
                         c.execute(
@@ -466,20 +195,20 @@ for blocker, software in c.fetchall():
                     if reason != '':
                         update_block_reason(reason, blocker, blocked if blocked.count("*") <= 1 else blocked_hash, block_level)
 
                     if reason != '':
                         update_block_reason(reason, blocker, blocked if blocked.count("*") <= 1 else blocked_hash, block_level)
 
-            conn.commit()
+            fba.conn.commit()
         except Exception as e:
             print("error:", e, blocker)
     elif software == "friendica" or software == "misskey":
         print(blocker)
         try:
             if software == "friendica":
         except Exception as e:
             print("error:", e, blocker)
     elif software == "friendica" or software == "misskey":
         print(blocker)
         try:
             if software == "friendica":
-                json = get_friendica_blocks(blocker)
+                json = fba.get_friendica_blocks(blocker)
             elif software == "misskey":
             elif software == "misskey":
-                json = get_misskey_blocks(blocker)
+                json = fba.get_misskey_blocks(blocker)
             for block_level, blocks in json.items():
                 for instance in blocks:
                     blocked, reason = instance.values()
             for block_level, blocks in json.items():
                 for instance in blocks:
                     blocked, reason = instance.values()
-                    blocked = tidyup(blocked)
+                    blocked = fba.tidyup(blocked)
 
                     print("BEFORE-blocked:", blocked)
                     if blocked.count("*") > 0:
 
                     print("BEFORE-blocked:", blocked)
                     if blocked.count("*") > 0:
@@ -528,7 +257,7 @@ for blocker, software in c.fetchall():
                     if reason != '':
                         update_block_reason(reason, blocker, blocked, block_level)
 
                     if reason != '':
                         update_block_reason(reason, blocker, blocked, block_level)
 
-            conn.commit()
+            fba.conn.commit()
         except Exception as e:
             print("error:", e, blocker)
     elif software == "gotosocial":
         except Exception as e:
             print("error:", e, blocker)
     elif software == "gotosocial":
@@ -586,7 +315,7 @@ for blocker, software in c.fetchall():
                         for entry in blockdict:
                             if entry["blocked"] == blocked:
                                 entry["reason"] = reason
                         for entry in blockdict:
                             if entry["blocked"] == blocked:
                                 entry["reason"] = reason
-                conn.commit()
+                fba.conn.commit()
         except Exception as e:
             print("error:", e, blocker)
 
         except Exception as e:
             print("error:", e, blocker)
 
@@ -594,4 +323,4 @@ for blocker, software in c.fetchall():
         send_bot_post(blocker, blockdict)
     blockdict = []
 
         send_bot_post(blocker, blockdict)
     blockdict = []
 
-conn.close()
+fba.conn.close()
index 20df590b761c1def4fdfc84e9b1b7586cfe352bd..5b0bb6ee84966219bb17cb8e0f1c391f68d58728 100644 (file)
@@ -1,72 +1,17 @@
-from reqto import get
-from hashlib import sha256
 import sqlite3
 import sys
 import json
 import sqlite3
 import sys
 import json
-
-with open("config.json") as f:
-    config = json.loads(f.read())
+from fba import c
+import fba
 
 domain = sys.argv[1]
 
 
 domain = sys.argv[1]
 
-blacklist = [
-    "activitypub-troll.cf",
-    "gab.best",
-    "4chan.icu",
-    "social.shrimpcam.pw",
-    "mastotroll.netz.org"
-]
-
-headers = {
-    "user-agent": config["useragent"]
-}
-
-def get_hash(domain: str) -> str:
-    return sha256(domain.encode("utf-8")).hexdigest()
-
-def get_peers(domain: str) -> str:
-    try:
-        res = get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
-        return res.json()
-    except:
-        print("WARNING: Cannot fetch peers:", domain)
-        return None
-
-peerlist = get_peers(domain)
+peerlist = fba.get_peers(domain)
 
 if (peerlist is None):
     print("FATAL: CANNOT FETCH PEERS:", domain)
     sys.exit(255)
 
 
 if (peerlist is None):
     print("FATAL: CANNOT FETCH PEERS:", domain)
     sys.exit(255)
 
-def get_type(instdomain: str) -> str:
-    try:
-        res = get(f"https://{instdomain}/nodeinfo/2.1.json", headers=headers, timeout=5)
-        if res.status_code == 404:
-            res = get(f"https://{instdomain}/nodeinfo/2.0", headers=headers, timeout=5)
-        if res.status_code == 404:
-            res = get(f"https://{instdomain}/nodeinfo/2.0.json", headers=headers, timeout=5)
-        if res.ok and "text/html" in res.headers["content-type"]:
-            res = get(f"https://{instdomain}/nodeinfo/2.1", headers=headers, timeout=5)
-        if res.ok:
-            if res.json()["software"]["name"] in ["akkoma", "rebased"]:
-                return "pleroma"
-            elif res.json()["software"]["name"] in ["hometown", "ecko"]:
-                return "mastodon"
-            elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
-                return "misskey"
-            else:
-                return res.json()["software"]["name"]
-        elif res.status_code == 404:
-            res = get(f"https://{instdomain}/api/v1/instance", headers=headers, timeout=5)
-        if res.ok:
-            return "mastodon"
-    except:
-        return None
-
-
-conn = sqlite3.connect("blocks.db")
-c = conn.cursor()
-
 c.execute(
     "SELECT domain FROM instances WHERE 1"
 )
 c.execute(
     "SELECT domain FROM instances WHERE 1"
 )
@@ -75,11 +20,12 @@ for instance in peerlist:
     instance = instance.lower()
 
     blacklisted = False
     instance = instance.lower()
 
     blacklisted = False
-    for domain in blacklist:
+    for domain in fba.blacklist:
         if domain in instance:
             blacklisted = True
 
     if blacklisted:
         if domain in instance:
             blacklisted = True
 
     if blacklisted:
+        print("domain is blacklisted:", domain)
         continue
 
     print(instance)
         continue
 
     print(instance)
@@ -87,12 +33,12 @@ for instance in peerlist:
         c.execute(
             "SELECT domain FROM instances WHERE domain = ?", (instance,)
         )
         c.execute(
             "SELECT domain FROM instances WHERE domain = ?", (instance,)
         )
+
         if c.fetchone() == None:
         if c.fetchone() == None:
-            c.execute(
-                "INSERT INTO instances SELECT ?, ?, ?",
-                (instance, get_hash(instance), get_type(instance)),
-            )
-        conn.commit()
+            fba.add_instance(instance)
+
+        fba.conn.commit()
     except Exception as e:
         print("error:", e, instance)
     except Exception as e:
         print("error:", e, instance)
-conn.close()
+
+fba.conn.close()