]> git.mxchange.org Git - fba.git/commitdiff
WIP:
authorRoland Häder <roland@mxchange.org>
Wed, 17 May 2023 05:49:20 +0000 (07:49 +0200)
committerRoland Häder <roland@mxchange.org>
Wed, 17 May 2023 05:49:20 +0000 (07:49 +0200)
- moved functions to fba.py module

fba.py [new file with mode: 0644]
fetch_blocks.py
fetch_instances.py

diff --git a/fba.py b/fba.py
new file mode 100644 (file)
index 0000000..c93996d
--- /dev/null
+++ b/fba.py
@@ -0,0 +1,314 @@
+from reqto import get
+from reqto import post
+from bs4 import BeautifulSoup
+from reqto import get
+from hashlib import sha256
+import sqlite3
+import json
+import sys
+
+with open("config.json") as f:
+    config = json.loads(f.read())
+
+blacklist = [
+    "activitypub-troll.cf",
+    "gab.best",
+    "4chan.icu",
+    "social.shrimpcam.pw",
+    "mastotroll.netz.org"
+]
+
+headers = {
+    "user-agent": config["useragent"]
+}
+
+conn = sqlite3.connect("blocks.db")
+c = conn.cursor()
+
+def get_hash(domain: str) -> str:
+    return sha256(domain.encode("utf-8")).hexdigest()
+
+def get_peers(domain: str) -> str:
+    try:
+        res = get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
+        return res.json()
+    except:
+        print("WARNING: Cannot fetch peers:", domain)
+        return None
+
+def get_type(instdomain: str) -> str:
+    try:
+        res = get(f"https://{instdomain}/nodeinfo/2.1.json", headers=headers, timeout=5)
+        if res.status_code == 404:
+            res = get(f"https://{instdomain}/nodeinfo/2.0", headers=headers, timeout=5)
+        if res.status_code == 404:
+            res = get(f"https://{instdomain}/nodeinfo/2.0.json", headers=headers, timeout=5)
+        if res.ok and "text/html" in res.headers["content-type"]:
+            res = get(f"https://{instdomain}/nodeinfo/2.1", headers=headers, timeout=5)
+        if res.ok:
+            if res.json()["software"]["name"] in ["akkoma", "rebased"]:
+                return "pleroma"
+            elif res.json()["software"]["name"] in ["hometown", "ecko"]:
+                return "mastodon"
+            elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
+                return "misskey"
+            else:
+                return res.json()["software"]["name"]
+        elif res.status_code == 404:
+            res = get(f"https://{instdomain}/api/v1/instance", headers=headers, timeout=5)
+        if res.ok:
+            return "mastodon"
+    except:
+        return None
+
+def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
+    # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
+    try:
+        c.execute(
+            "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
+            (
+                reason,
+                blocker,
+                blocked,
+                block_level
+            ),
+        )
+    except:
+        print("ERROR: failed SQL query")
+        sys.exit(255)
+
+def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
+    # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
+    try:
+        c.execute(
+            "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
+            (
+                last_seen,
+                blocker,
+                blocked,
+                block_level
+            )
+        )
+    except:
+        print("ERROR: failed SQL query")
+        sys.exit(255)
+
+def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
+    print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
+    try:
+        c.execute(
+            "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
+             (
+                 blocker,
+                 blocked,
+                 reason,
+                 block_level,
+                 first_added,
+                 last_seen
+             ),
+        )
+    except:
+        print("ERROR: failed SQL query")
+        sys.exit(255)
+
+def add_instance(domain: str):
+    print("--- Adding new instance:", domain)
+    try:
+        c.execute(
+            "INSERT INTO instances SELECT ?, ?, ?",
+            (
+               domain,
+               get_hash(domain),
+               get_type(domain)
+            ),
+        )
+    except:
+        print("ERROR: failed SQL query")
+        sys.exit(255)
+
+def send_bot_post(instance: str, blocks: dict):
+    message = instance + " has blocked the following instances:\n\n"
+    truncated = False
+
+    if len(blocks) > 20:
+        truncated = True
+        blocks = blocks[0 : 19]
+
+    for block in blocks:
+        if block["reason"] == None or block["reason"] == '':
+            message = message + block["blocked"] + " with unspecified reason\n"
+        else:
+            if len(block["reason"]) > 420:
+                block["reason"] = block["reason"][0:419] + "[…]"
+            message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
+    if truncated:
+        message = message + "(the list has been truncated to the first 20 entries)"
+
+    botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
+    req = post(f"{config['bot_instance']}/api/v1/statuses",
+        data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
+        headers=botheaders, timeout=10).json()
+    return True
+
+def get_mastodon_blocks(domain: str) -> dict:
+    blocks = {
+        "Suspended servers": [],
+        "Filtered media": [],
+        "Limited servers": [],
+        "Silenced servers": [],
+    }
+
+    translations = {
+        "Silenced instances": "Silenced servers",
+        "Suspended instances": "Suspended servers",
+        "Gesperrte Server": "Suspended servers",
+        "Gefilterte Medien": "Filtered media",
+        "Stummgeschaltete Server": "Silenced servers",
+        "停止済みのサーバー": "Suspended servers",
+        "メディアを拒否しているサーバー": "Filtered media",
+        "サイレンス済みのサーバー": "Silenced servers",
+        "שרתים מושעים": "Suspended servers",
+        "מדיה מסוננת": "Filtered media",
+        "שרתים מוגבלים": "Silenced servers",
+        "Serveurs suspendus": "Suspended servers",
+        "Médias filtrés": "Filtered media",
+        "Serveurs limités": "Silenced servers",
+    }
+
+    try:
+        doc = BeautifulSoup(
+            get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
+            "html.parser",
+        )
+    except:
+        print("ERROR: Cannot fetch from domain:", domain)
+        return {}
+
+    for header in doc.find_all("h3"):
+        header_text = header.text
+        if header_text in translations:
+            header_text = translations[header_text]
+        if header_text in blocks:
+            # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
+            for line in header.find_all_next("table")[0].find_all("tr")[1:]:
+                blocks[header_text].append(
+                    {
+                        "domain": line.find("span").text,
+                        "hash": line.find("span")["title"][9:],
+                        "reason": line.find_all("td")[1].text.strip(),
+                    }
+                )
+    return {
+        "reject": blocks["Suspended servers"],
+        "media_removal": blocks["Filtered media"],
+        "followers_only": blocks["Limited servers"]
+        + blocks["Silenced servers"],
+    }
+
+def get_friendica_blocks(domain: str) -> dict:
+    blocks = []
+
+    try:
+        doc = BeautifulSoup(
+            get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
+            "html.parser",
+        )
+    except:
+        return {}
+
+    blocklist = doc.find(id="about_blocklist")
+
+    # Prevents exceptions:
+    if blocklist is None:
+        print("Instance has no block list:", domain)
+        return {}
+
+    for line in blocklist.find("table").find_all("tr")[1:]:
+        blocks.append(
+            {
+                "domain": line.find_all("td")[0].text.strip(),
+                "reason": line.find_all("td")[1].text.strip()
+            }
+        )
+
+    return {
+        "reject": blocks
+    }
+
+def get_misskey_blocks(domain: str) -> dict:
+    blocks = {
+        "suspended": [],
+        "blocked": []
+    }
+
+    try:
+        counter = 0
+        step = 99
+        while True:
+            # iterating through all "suspended" (follow-only in its terminology) instances page-by-page, since that troonware doesn't support sending them all at once
+            try:
+                if counter == 0:
+                    doc = post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step}), headers=headers, timeout=5).json()
+                    if doc == []: raise
+                else:
+                    doc = post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
+                    if doc == []: raise
+                for instance in doc:
+                    # just in case
+                    if instance["isSuspended"]:
+                        blocks["suspended"].append(
+                            {
+                                "domain": instance["host"],
+                                # no reason field, nothing
+                                "reason": ""
+                            }
+                        )
+                counter = counter + step
+            except:
+                counter = 0
+                break
+
+        while True:
+            # same shit, different asshole ("blocked" aka full suspend)
+            try:
+                if counter == 0:
+                    doc = post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step}), headers=headers, timeout=5).json()
+                    if doc == []: raise
+                else:
+                    doc = post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
+                    if doc == []: raise
+                for instance in doc:
+                    if instance["isBlocked"]:
+                        blocks["blocked"].append(
+                            {
+                                "domain": instance["host"],
+                                "reason": ""
+                            }
+                        )
+                counter = counter + step
+            except:
+                counter = 0
+                break
+
+        return {
+            "reject": blocks["blocked"],
+            "followers_only": blocks["suspended"]
+        }
+
+    except:
+        return {}
+
+def tidyup(domain: str) -> str:
+    # some retards put their blocks in variable case
+    domain = domain.lower()
+    # other retards put the port
+    domain = re.sub("\:\d+$", "", domain)
+    # bigger retards put the schema in their blocklist, sometimes even without slashes
+    domain = re.sub("^https?\:(\/*)", "", domain)
+    # and trailing slash
+    domain = re.sub("\/$", "", domain)
+    # and the @
+    domain = re.sub("^\@", "", domain)
+    # the biggest retards of them all try to block individual users
+    domain = re.sub("(.+)\@", "", domain)
+    return domain
index 58f86599f694b050381666b8032797c7a778da37..b5cb55d7ed806be959201d67e74fcc361c4c2620 100644 (file)
 from reqto import get
 from reqto import post
 from hashlib import sha256
-import sqlite3
 from bs4 import BeautifulSoup
 from json import dumps
 from json import loads
 import re
 from time import time
 import itertools
-
-with open("config.json") as f:
-    config = loads(f.read())
-
-headers = {
-    "user-agent": config["useragent"]
-}
-
-def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
-    # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
-    c.execute(
-        "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
-        (
-            reason,
-            blocker,
-            blocked,
-            block_level
-        ),
-    )
-
-def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
-    # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
-    c.execute(
-        "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
-        (
-            last_seen,
-            blocker,
-            blocked,
-            block_level
-        )
-    )
-
-def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
-    print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
-    c.execute(
-        "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
-         (
-             blocker,
-             blocked,
-             reason,
-             block_level,
-             first_added,
-             last_seen
-         ),
-    )
-
-def add_instance (domain: str):
-    print("--- Adding new instance:", domain)
-    c.execute(
-        "INSERT INTO instances SELECT ?, ?, ?",
-        (
-           blocked,
-           get_hash(blocked),
-           get_type(blocked)
-        ),
-    )
-
-def send_bot_post(instance: str, blocks: dict):
-    message = instance + " has blocked the following instances:\n\n"
-    truncated = False
-    if len(blocks) > 20:
-        truncated = True
-        blocks = blocks[0 : 19]
-    for block in blocks:
-        if block["reason"] == None or block["reason"] == '':
-            message = message + block["blocked"] + " with unspecified reason\n"
-        else:
-            if len(block["reason"]) > 420:
-                block["reason"] = block["reason"][0:419] + "[…]"
-            message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
-    if truncated:
-        message = message + "(the list has been truncated to the first 20 entries)"
-
-    botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
-    req = post(f"{config['bot_instance']}/api/v1/statuses",
-        data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
-        headers=botheaders, timeout=10).json()
-    return True
-
-def get_mastodon_blocks(domain: str) -> dict:
-    blocks = {
-        "Suspended servers": [],
-        "Filtered media": [],
-        "Limited servers": [],
-        "Silenced servers": [],
-    }
-
-    translations = {
-        "Silenced instances": "Silenced servers",
-        "Suspended instances": "Suspended servers",
-        "Gesperrte Server": "Suspended servers",
-        "Gefilterte Medien": "Filtered media",
-        "Stummgeschaltete Server": "Silenced servers",
-        "停止済みのサーバー": "Suspended servers",
-        "メディアを拒否しているサーバー": "Filtered media",
-        "サイレンス済みのサーバー": "Silenced servers",
-        "שרתים מושעים": "Suspended servers",
-        "מדיה מסוננת": "Filtered media",
-        "שרתים מוגבלים": "Silenced servers",
-        "Serveurs suspendus": "Suspended servers",
-        "Médias filtrés": "Filtered media",
-        "Serveurs limités": "Silenced servers",
-    }
-
-    try:
-        doc = BeautifulSoup(
-            get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
-            "html.parser",
-        )
-    except:
-        return {}
-
-    for header in doc.find_all("h3"):
-        header_text = header.text
-        if header_text in translations:
-            header_text = translations[header_text]
-        if header_text in blocks:
-            # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
-            for line in header.find_all_next("table")[0].find_all("tr")[1:]:
-                blocks[header_text].append(
-                    {
-                        "domain": line.find("span").text,
-                        "hash": line.find("span")["title"][9:],
-                        "reason": line.find_all("td")[1].text.strip(),
-                    }
-                )
-    return {
-        "reject": blocks["Suspended servers"],
-        "media_removal": blocks["Filtered media"],
-        "followers_only": blocks["Limited servers"]
-        + blocks["Silenced servers"],
-    }
-
-def get_friendica_blocks(domain: str) -> dict:
-    blocks = []
-
-    try:
-        doc = BeautifulSoup(
-            get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
-            "html.parser",
-        )
-    except:
-        return {}
-
-    blocklist = doc.find(id="about_blocklist")
-
-    # Prevents exceptions:
-    if blocklist is None:
-        print("Instance has no block list:", domain)
-        return {}
-
-    for line in blocklist.find("table").find_all("tr")[1:]:
-        blocks.append(
-            {
-                "domain": line.find_all("td")[0].text.strip(),
-                "reason": line.find_all("td")[1].text.strip()
-            }
-        )
-
-    return {
-        "reject": blocks
-    }
-
-def get_misskey_blocks(domain: str) -> dict:
-    blocks = {
-        "suspended": [],
-        "blocked": []
-    }
-
-    try:
-        counter = 0
-        step = 99
-        while True:
-            # iterating through all "suspended" (follow-only in its terminology) instances page-by-page, since that troonware doesn't support sending them all at once
-            try:
-                if counter == 0:
-                    doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step}), headers=headers, timeout=5).json()
-                    if doc == []: raise
-                else:
-                    doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
-                    if doc == []: raise
-                for instance in doc:
-                    # just in case
-                    if instance["isSuspended"]:
-                        blocks["suspended"].append(
-                            {
-                                "domain": instance["host"],
-                                # no reason field, nothing
-                                "reason": ""
-                            }
-                        )
-                counter = counter + step
-            except:
-                counter = 0
-                break
-
-        while True:
-            # same shit, different asshole ("blocked" aka full suspend)
-            try:
-                if counter == 0:
-                    doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step}), headers=headers, timeout=5).json()
-                    if doc == []: raise
-                else:
-                    doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
-                    if doc == []: raise
-                for instance in doc:
-                    if instance["isBlocked"]:
-                        blocks["blocked"].append(
-                            {
-                                "domain": instance["host"],
-                                "reason": ""
-                            }
-                        )
-                counter = counter + step
-            except:
-                counter = 0
-                break
-
-        return {
-            "reject": blocks["blocked"],
-            "followers_only": blocks["suspended"]
-        }
-
-    except:
-        return {}
-
-def get_hash(domain: str) -> str:
-    return sha256(domain.encode("utf-8")).hexdigest()
-
-
-def get_type(domain: str) -> str:
-    try:
-        res = get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
-        if res.status_code == 404:
-            res = get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
-        if res.status_code == 404:
-            res = get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
-        if res.ok and "text/html" in res.headers["content-type"]:
-            res = get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
-        if res.ok:
-            if res.json()["software"]["name"] in ["akkoma", "rebased"]:
-                return "pleroma"
-            elif res.json()["software"]["name"] in ["hometown", "ecko"]:
-                return "mastodon"
-            elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
-                return "misskey"
-            else:
-                return res.json()["software"]["name"]
-        elif res.status_code == 404:
-            res = get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
-        if res.ok:
-            return "mastodon"
-    except:
-        return None
-
-def tidyup(domain: str) -> str:
-    # some retards put their blocks in variable case
-    domain = domain.lower()
-    # other retards put the port
-    domain = re.sub("\:\d+$", "", domain)
-    # bigger retards put the schema in their blocklist, sometimes even without slashes
-    domain = re.sub("^https?\:(\/*)", "", domain)
-    # and trailing slash
-    domain = re.sub("\/$", "", domain)
-    # and the @
-    domain = re.sub("^\@", "", domain)
-    # the biggest retards of them all try to block individual users
-    domain = re.sub("(.+)\@", "", domain)
-    return domain
-
-conn = sqlite3.connect("blocks.db")
-c = conn.cursor()
+from fba import c
+import fba
 
 c.execute(
     "SELECT domain, software FROM instances WHERE domain='tooting.intensifi.es'"
@@ -288,7 +17,7 @@ c.execute(
 
 for blocker, software in c.fetchall():
     blockdict = []
-    blocker = tidyup(blocker)
+    blocker = fba.tidyup(blocker)
     if software == "pleroma":
         print(blocker)
         try:
@@ -302,10 +31,10 @@ for blocker, software in c.fetchall():
                     **{"quarantined_instances": federation["quarantined_instances"]}}
                 ).items():
                     for blocked in blocks:
-                        blocked = tidyup(blocked)
+                        blocked = fba.tidyup(blocked)
 
                         if blocked == "":
-                            print("WARNING: blocked is empty after tidyup():", blocker, block_level)
+                            print("WARNING: blocked is empty after fba.tidyup():", blocker, block_level)
                             continue
 
                         if blocked.count("*") > 1:
@@ -344,7 +73,7 @@ for blocker, software in c.fetchall():
                                     })
                         else:
                             update_last_seen(timestamp, blocker, blocked, block_level)
-            conn.commit()
+            fba.conn.commit()
             # Reasons
             if "mrf_simple_info" in federation:
                 for block_level, info in (
@@ -354,10 +83,10 @@ for blocker, software in c.fetchall():
                     else {})}
                 ).items():
                     for blocked, reason in info.items():
-                        blocked = tidyup(blocked)
+                        blocked = fba.tidyup(blocked)
 
                         if blocked == "":
-                            print("WARNING: blocked is empty after tidyup():", blocker, block_level)
+                            print("WARNING: blocked is empty after fba.tidyup():", blocker, block_level)
                             continue
 
                         if blocked.count("*") > 1:
@@ -376,7 +105,7 @@ for blocker, software in c.fetchall():
                             if entry["blocked"] == blocked:
                                 entry["reason"] = reason["reason"]
 
-            conn.commit()
+            fba.conn.commit()
         except Exception as e:
             print("error:", e, blocker)
     elif software == "mastodon":
@@ -419,12 +148,12 @@ for blocker, software in c.fetchall():
                     else:
                         print("WARNING: Unknown severity:", block['severity'], block['domain'])
             except:
-                json = get_mastodon_blocks(blocker)
+                json = fba.get_mastodon_blocks(blocker)
 
             for block_level, blocks in json.items():
                 for instance in blocks:
                     blocked, blocked_hash, reason = instance.values()
-                    blocked = tidyup(blocked)
+                    blocked = fba.tidyup(blocked)
 
                     if blocked.count("*") <= 1:
                         c.execute(
@@ -466,20 +195,20 @@ for blocker, software in c.fetchall():
                     if reason != '':
                         update_block_reason(reason, blocker, blocked if blocked.count("*") <= 1 else blocked_hash, block_level)
 
-            conn.commit()
+            fba.conn.commit()
         except Exception as e:
             print("error:", e, blocker)
     elif software == "friendica" or software == "misskey":
         print(blocker)
         try:
             if software == "friendica":
-                json = get_friendica_blocks(blocker)
+                json = fba.get_friendica_blocks(blocker)
             elif software == "misskey":
-                json = get_misskey_blocks(blocker)
+                json = fba.get_misskey_blocks(blocker)
             for block_level, blocks in json.items():
                 for instance in blocks:
                     blocked, reason = instance.values()
-                    blocked = tidyup(blocked)
+                    blocked = fba.tidyup(blocked)
 
                     print("BEFORE-blocked:", blocked)
                     if blocked.count("*") > 0:
@@ -528,7 +257,7 @@ for blocker, software in c.fetchall():
                     if reason != '':
                         update_block_reason(reason, blocker, blocked, block_level)
 
-            conn.commit()
+            fba.conn.commit()
         except Exception as e:
             print("error:", e, blocker)
     elif software == "gotosocial":
@@ -586,7 +315,7 @@ for blocker, software in c.fetchall():
                         for entry in blockdict:
                             if entry["blocked"] == blocked:
                                 entry["reason"] = reason
-                conn.commit()
+                fba.conn.commit()
         except Exception as e:
             print("error:", e, blocker)
 
@@ -594,4 +323,4 @@ for blocker, software in c.fetchall():
         send_bot_post(blocker, blockdict)
     blockdict = []
 
-conn.close()
+fba.conn.close()
index 20df590b761c1def4fdfc84e9b1b7586cfe352bd..5b0bb6ee84966219bb17cb8e0f1c391f68d58728 100644 (file)
@@ -1,72 +1,17 @@
-from reqto import get
-from hashlib import sha256
 import sqlite3
 import sys
 import json
-
-with open("config.json") as f:
-    config = json.loads(f.read())
+from fba import c
+import fba
 
 domain = sys.argv[1]
 
-blacklist = [
-    "activitypub-troll.cf",
-    "gab.best",
-    "4chan.icu",
-    "social.shrimpcam.pw",
-    "mastotroll.netz.org"
-]
-
-headers = {
-    "user-agent": config["useragent"]
-}
-
-def get_hash(domain: str) -> str:
-    return sha256(domain.encode("utf-8")).hexdigest()
-
-def get_peers(domain: str) -> str:
-    try:
-        res = get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
-        return res.json()
-    except:
-        print("WARNING: Cannot fetch peers:", domain)
-        return None
-
-peerlist = get_peers(domain)
+peerlist = fba.get_peers(domain)
 
 if (peerlist is None):
     print("FATAL: CANNOT FETCH PEERS:", domain)
     sys.exit(255)
 
-def get_type(instdomain: str) -> str:
-    try:
-        res = get(f"https://{instdomain}/nodeinfo/2.1.json", headers=headers, timeout=5)
-        if res.status_code == 404:
-            res = get(f"https://{instdomain}/nodeinfo/2.0", headers=headers, timeout=5)
-        if res.status_code == 404:
-            res = get(f"https://{instdomain}/nodeinfo/2.0.json", headers=headers, timeout=5)
-        if res.ok and "text/html" in res.headers["content-type"]:
-            res = get(f"https://{instdomain}/nodeinfo/2.1", headers=headers, timeout=5)
-        if res.ok:
-            if res.json()["software"]["name"] in ["akkoma", "rebased"]:
-                return "pleroma"
-            elif res.json()["software"]["name"] in ["hometown", "ecko"]:
-                return "mastodon"
-            elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
-                return "misskey"
-            else:
-                return res.json()["software"]["name"]
-        elif res.status_code == 404:
-            res = get(f"https://{instdomain}/api/v1/instance", headers=headers, timeout=5)
-        if res.ok:
-            return "mastodon"
-    except:
-        return None
-
-
-conn = sqlite3.connect("blocks.db")
-c = conn.cursor()
-
 c.execute(
     "SELECT domain FROM instances WHERE 1"
 )
@@ -75,11 +20,12 @@ for instance in peerlist:
     instance = instance.lower()
 
     blacklisted = False
-    for domain in blacklist:
+    for domain in fba.blacklist:
         if domain in instance:
             blacklisted = True
 
     if blacklisted:
+        print("domain is blacklisted:", domain)
         continue
 
     print(instance)
@@ -87,12 +33,12 @@ for instance in peerlist:
         c.execute(
             "SELECT domain FROM instances WHERE domain = ?", (instance,)
         )
+
         if c.fetchone() == None:
-            c.execute(
-                "INSERT INTO instances SELECT ?, ?, ?",
-                (instance, get_hash(instance), get_type(instance)),
-            )
-        conn.commit()
+            fba.add_instance(instance)
+
+        fba.conn.commit()
     except Exception as e:
         print("error:", e, instance)
-conn.close()
+
+fba.conn.close()