]> git.mxchange.org Git - fba.git/blobdiff - fba.py
Continued:
[fba.git] / fba.py
diff --git a/fba.py b/fba.py
old mode 100644 (file)
new mode 100755 (executable)
index 8a89b86..d131b8a
--- a/fba.py
+++ b/fba.py
-from bs4 import BeautifulSoup
-from hashlib import sha256
+#!venv/bin/python3
+# -*- coding: utf-8 -*-
+
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-import reqto
-import re
-import sqlite3
-import json
 import sys
-import time
+from fba import boot
 
-with open("config.json") as f:
-    config = json.loads(f.read())
+# Init parser
+boot.init_parser()
 
-blacklist = [
-    "activitypub-troll.cf",
-    "gab.best",
-    "4chan.icu",
-    "social.shrimpcam.pw",
-    "mastotroll.netz.org",
-    "ngrok.io",
-]
+# Run command
+status = boot.run_command()
 
-headers = {
-    "user-agent": config["useragent"]
-}
+# Shutdown again
+boot.shutdown()
 
-connection = sqlite3.connect("blocks.db")
-cursor = connection.cursor()
-
-def is_blacklisted(domain: str) -> bool:
-    # NOISY-DEBUG: print("DEBUG: Checking blacklist for domain:", domain)
-    blacklisted = False
-    for peer in blacklist:
-        # NOISY-DEBUG: print("DEBUG: domain,peer:", domain, peer)
-        if peer in domain:
-            blacklisted = True
-
-    # NOISY-DEBUG: print("DEBUG: blacklisted:", blacklisted)
-    return blacklisted
-
-def get_hash(domain: str) -> str:
-    # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
-    return sha256(domain.encode("utf-8")).hexdigest()
-
-def update_last_blocked(domain: str):
-    # NOISY-DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
-
-    try:
-        cursor.execute("UPDATE instances SET last_blocked = ?, last_updated = ? WHERE domain = ?", [
-            time.time(),
-            time.time(),
-            domain
-        ])
-
-    except:
-        print("ERROR: failed SQL query:", domain)
-        sys.exit(255)
-
-def update_last_error(domain: str, res: any):
-    # NOISY-DEBUG: print("DEBUG: domain,res.status_code:", domain, res.status_code, res.reason)
-    try:
-        cursor.execute("UPDATE instances SET last_status_code = ?, last_error_details = ?, last_updated = ? WHERE domain = ?", [
-            res.status_code,
-            res.reason,
-            time.time(),
-            domain
-        ])
-
-    except:
-        print("ERROR: failed SQL query:", domain)
-        sys.exit(255)
-
-def update_last_nodeinfo(domain: str):
-    # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
-    try:
-        cursor.execute("UPDATE instances SET last_nodeinfo = ?, last_updated = ? WHERE domain = ?", [
-            time.time(),
-            time.time(),
-            domain
-        ])
-
-    except:
-        print("ERROR: failed SQL query:", domain)
-        sys.exit(255)
-
-    connection.commit()
-
-def get_peers(domain: str) -> list:
-    # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
-    peers = None
-
-    try:
-        res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
-
-        if not res.ok or res.status_code >= 400:
-            print("WARNING: Cannot fetch peers:", domain)
-            update_last_error(domain, res)
-        else:
-            # NOISY-DEBUG: print("DEBUG: Querying API was successful:", domain, len(res.json()))
-            peers = res.json()
-
-    except:
-        print("WARNING: Some error during get():", domain)
-
-    update_last_nodeinfo(domain)
-
-    # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
-    return peers
-
-def post_json_api(domain: str, path: str, data: str) -> list:
-    # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
-    try:
-        res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
-
-        if not res.ok or res.status_code >= 400:
-            print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
-            update_last_error(domain, res)
-            raise
-
-        update_last_nodeinfo(domain)
-        json = res.json()
-    except:
-        print("WARNING: Some error during post():", domain, path, data)
-
-    # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
-    return json
-
-def fetch_nodeinfo(domain: str) -> list:
-    # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
-
-    requests = [
-       f"https://{domain}/nodeinfo/2.1.json",
-       f"https://{domain}/nodeinfo/2.0",
-       f"https://{domain}/nodeinfo/2.0.json",
-       f"https://{domain}/nodeinfo/2.1",
-       f"https://{domain}/api/v1/instance"
-    ]
-
-    json = None
-    for request in requests:
-        # NOISY-DEBUG: print("DEBUG: Fetching request:", request)
-        res = reqto.get(request, headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
-
-        # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
-        if res.ok and res.json() is not None:
-            # NOISY-DEBUG: print("DEBUG: Success:", request)
-            json = res.json()
-            break
-        elif not res.ok or res.status_code >= 400:
-            # NOISY-DEBUG: print("DEBUG: Failed fetching nodeinfo from domain:", domain)
-            update_last_error(domain, res)
-            continue
-
-    if json is None:
-        print("WARNING: Failed fetching nodeinfo from domain:", domain)
-
-    # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
-    update_last_nodeinfo(domain)
-
-    # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
-    return json
-
-def determine_software(domain: str) -> str:
-    # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
-    software = None
-
-    try:
-        json = fetch_nodeinfo(domain)
-
-        # NOISY-DEBUG: print("DEBUG: json():", len(json))
-        software = tidyup(json["software"]["name"])
-
-        # NOISY-DEBUG: print("DEBUG: BEFORE software:", software)
-        if software in ["akkoma", "rebased"]:
-            # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, software)
-            software = "pleroma"
-        elif software in ["hometown", "ecko"]:
-            # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, software)
-            software = "mastodon"
-        elif software in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
-            # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, software)
-            software = "misskey"
-        elif software.find("/") > 0:
-            print("WARNING: Spliting of path:", software)
-            software = software.split("/")[-1];
-
-        if software == "":
-            print("WARNING: tidyup() left no software name behind:", domain)
-            software = None
-        # NOISY-DEBUG: print("DEBUG: AFTER software:", software)
-    except:
-        print("WARNING: Could not determine software type:", domain)
-
-    # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
-    return software
-
-def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
-    # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
-    try:
-        cursor.execute(
-            "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
-            (
-                reason,
-                time.time(),
-                blocker,
-                blocked,
-                block_level
-            ),
-        )
-
-    except:
-        print("ERROR: failed SQL query:", reason, blocker, blocked, block_level)
-        sys.exit(255)
-
-def update_last_seen(blocker: str, blocked: str, block_level: str):
-    # NOISY: print("--- Updating last_seen for:", blocker, blocked, block_level)
-    try:
-        cursor.execute(
-            "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
-            (
-                time.time(),
-                blocker,
-                blocked,
-                block_level
-            )
-        )
-
-    except:
-        print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level)
-        sys.exit(255)
-
-def block_instance(blocker: str, blocked: str, reason: str, block_level: str):
-    # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
-    if blocker.find("@") > 0:
-        print("WARNING: Bad blocker:", blocker)
-        raise
-    elif blocked.find("@") > 0:
-        print("WARNING: Bad blocked:", blocked)
-        raise
-
-    print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
-    try:
-        cursor.execute(
-            "INSERT INTO blocks (blocker, blocked, reason, block_level, first_added, last_seen) VALUES(?, ?, ?, ?, ?, ?)",
-             (
-                 blocker,
-                 blocked,
-                 reason,
-                 block_level,
-                 time.time(),
-                 time.time()
-             ),
-        )
-
-    except:
-        print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen)
-        sys.exit(255)
-
-def add_instance(domain: str, origin: str, originator: str):
-    # NOISY-DEBUG: print("DEBUG: domain,origin:", domain, origin, originator)
-    if domain.find("@") > 0:
-        print("WARNING: Bad domain name:", domain)
-        raise
-    elif origin is not None and origin.find("@") > 0:
-        print("WARNING: Bad origin name:", origin)
-        raise
-
-    print(f"--- Adding new instance {domain} (origin: {origin})")
-    try:
-        cursor.execute(
-            "INSERT INTO instances (domain, origin, originator, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
-            (
-               domain,
-               origin,
-               originator,
-               get_hash(domain),
-               determine_software(domain),
-               time.time()
-            ),
-        )
-
-    except:
-        print("ERROR: failed SQL query:", domain)
-        sys.exit(255)
-
-def send_bot_post(instance: str, blocks: dict):
-    message = instance + " has blocked the following instances:\n\n"
-    truncated = False
-
-    if len(blocks) > 20:
-        truncated = True
-        blocks = blocks[0 : 19]
-
-    for block in blocks:
-        if block["reason"] == None or block["reason"] == '':
-            message = message + block["blocked"] + " with unspecified reason\n"
-        else:
-            if len(block["reason"]) > 420:
-                block["reason"] = block["reason"][0:419] + "[…]"
-
-            message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
-
-    if truncated:
-        message = message + "(the list has been truncated to the first 20 entries)"
-
-    botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
-
-    req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
-        data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
-        headers=botheaders, timeout=10).json()
-
-    return True
-
-def get_mastodon_blocks(domain: str) -> dict:
-    # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
-    blocks = {
-        "Suspended servers": [],
-        "Filtered media": [],
-        "Limited servers": [],
-        "Silenced servers": [],
-    }
-
-    translations = {
-        "Silenced instances": "Silenced servers",
-        "Suspended instances": "Suspended servers",
-        "Gesperrte Server": "Suspended servers",
-        "Gefilterte Medien": "Filtered media",
-        "Stummgeschaltete Server": "Silenced servers",
-        "停止済みのサーバー": "Suspended servers",
-        "メディアを拒否しているサーバー": "Filtered media",
-        "サイレンス済みのサーバー": "Silenced servers",
-        "שרתים מושעים": "Suspended servers",
-        "מדיה מסוננת": "Filtered media",
-        "שרתים מוגבלים": "Silenced servers",
-        "Serveurs suspendus": "Suspended servers",
-        "Médias filtrés": "Filtered media",
-        "Serveurs limités": "Silenced servers",
-    }
-
-    try:
-        doc = BeautifulSoup(
-            reqto.get(f"https://{domain}/about/more", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
-            "html.parser",
-        )
-    except:
-        print("ERROR: Cannot fetch from domain:", domain)
-        return {}
-
-    for header in doc.find_all("h3"):
-        header_text = header.text
-
-        if header_text in translations:
-            header_text = translations[header_text]
-
-        if header_text in blocks:
-            # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
-            for line in header.find_all_next("table")[0].find_all("tr")[1:]:
-                blocks[header_text].append(
-                    {
-                        "domain": tidyup(line.find("span").text),
-                        "hash"  : tidyup(line.find("span")["title"][9:]),
-                        "reason": tidyup(line.find_all("td")[1].text),
-                    }
-                )
-
-    # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
-    return {
-        "reject"        : blocks["Suspended servers"],
-        "media_removal" : blocks["Filtered media"],
-        "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
-    }
-
-def get_friendica_blocks(domain: str) -> dict:
-    # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
-    blocks = []
-
-    try:
-        doc = BeautifulSoup(
-            reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
-            "html.parser",
-        )
-    except:
-        print("WARNING: Failed to fetch /friendica from domain:", domain)
-        return {}
-
-    blocklist = doc.find(id="about_blocklist")
-
-    # Prevents exceptions:
-    if blocklist is None:
-        # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
-        return {}
-
-    for line in blocklist.find("table").find_all("tr")[1:]:
-        blocks.append({
-            "domain": tidyup(line.find_all("td")[0].text),
-            "reason": tidyup(line.find_all("td")[1].text)
-        })
-
-    # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
-    return {
-        "reject": blocks
-    }
-
-def get_misskey_blocks(domain: str) -> dict:
-    # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
-    blocks = {
-        "suspended": [],
-        "blocked": []
-    }
-
-    try:
-        counter = 0
-        step = 99
-        while True:
-            # iterating through all "suspended" (follow-only in its terminology)
-            # instances page-by-page, since that troonware doesn't support
-            # sending them all at once
-            try:
-                if counter == 0:
-                    # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
-                    doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
-                        "sort"     : "+caughtAt",
-                        "host"     : None,
-                        "suspended": True,
-                        "limit"    : step
-                    }))
-                else:
-                    # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
-                    doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
-                        "sort"     : "+caughtAt",
-                        "host"     : None,
-                        "suspended": True,
-                        "limit"    : step,
-                        "offset"   : counter-1
-                    }))
-
-                # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
-                if len(doc) == 0:
-                    # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
-                    break
-
-                for instance in doc:
-                    # just in case
-                    if instance["isSuspended"]:
-                        blocks["suspended"].append(
-                            {
-                                "domain": tidyup(instance["host"]),
-                                # no reason field, nothing
-                                "reason": ""
-                            }
-                        )
-
-                if len(doc) < step:
-                    # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
-                    break
-
-                # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
-                counter = counter + step
-
-            except:
-                print("WARNING: Caught error, exiting loop:", domain)
-                counter = 0
-                break
-
-        while True:
-            # same shit, different asshole ("blocked" aka full suspend)
-            try:
-                if counter == 0:
-                    # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
-                    doc = post_json_api(domain,"/api/federation/instances", json.dumps({
-                        "sort"   : "+caughtAt",
-                        "host"   : None,
-                        "blocked": True,
-                        "limit"  : step
-                    }))
-                else:
-                    # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
-                    doc = post_json_api(domain,"/api/federation/instances", json.dumps({
-                        "sort"   : "+caughtAt",
-                        "host"   : None,
-                        "blocked": True,
-                        "limit"  : step,
-                        "offset" : counter-1
-                    }))
-
-                # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
-                if len(doc) == 0:
-                    # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
-                    break
-
-                for instance in doc:
-                    if instance["isBlocked"]:
-                        blocks["blocked"].append({
-                            "domain": tidyup(instance["host"]),
-                            "reason": ""
-                        })
-
-                if len(doc) < step:
-                    # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
-                    break
-
-                # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
-                counter = counter + step
-
-            except:
-                counter = 0
-                break
-
-        # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
-        return {
-            "reject"        : blocks["blocked"],
-            "followers_only": blocks["suspended"]
-        }
-
-    except:
-        print("WARNING: API request failed for domain:", domain)
-        return {}
-
-def tidyup(string: str) -> str:
-    # NOISY-DEBUG: print("DEBUG: BEFORE string:", string)
-
-    # some retards put their blocks in variable case
-    string = string.lower().strip()
-
-    # other retards put the port
-    string = re.sub("\:\d+$", "", string)
-
-    # bigger retards put the schema in their blocklist, sometimes even without slashes
-    string = re.sub("^https?\:(\/*)", "", string)
-
-    # and trailing slash
-    string = re.sub("\/$", "", string)
-
-    # and the @
-    string = re.sub("^\@", "", string)
-
-    # the biggest retards of them all try to block individual users
-    string = re.sub("(.+)\@", "", string)
-
-    # NOISY-DEBUG: print("DEBUG: AFTER string:", string)
-    return string
+# Exit with status code
+sys.exit(status)