-from bs4 import BeautifulSoup
-from hashlib import sha256
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
-import reqto
-import re
-import sqlite3
-import json
import sys
-import time
+from fba import boot
-with open("config.json") as f:
- config = json.loads(f.read())
+# Init parser
+boot.init_parser()
-blacklist = [
- "activitypub-troll.cf",
- "gab.best",
- "4chan.icu",
- "social.shrimpcam.pw",
- "mastotroll.netz.org",
- "ngrok.io",
-]
+# Run command
+status = boot.run_command()
-pending_errors = {
-}
+# Shutdown again
+boot.shutdown()
-nodeinfos = [
- "http://nodeinfo.diaspora.software/ns/schema/2.1",
- "http://nodeinfo.diaspora.software/ns/schema/2.0",
- "http://nodeinfo.diaspora.software/ns/schema/1.1",
- "http://nodeinfo.diaspora.software/ns/schema/1.0",
-]
-
-headers = {
- "user-agent": config["useragent"]
-}
-
-connection = sqlite3.connect("blocks.db")
-cursor = connection.cursor()
-
-def is_blacklisted(domain: str) -> bool:
- blacklisted = False
- for peer in blacklist:
- if peer in domain:
- blacklisted = True
-
- return blacklisted
-
-def get_hash(domain: str) -> str:
- return sha256(domain.encode("utf-8")).hexdigest()
-
-def update_last_blocked(domain: str):
- # NOISY-DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
- try:
- cursor.execute("UPDATE instances SET last_blocked = ?, last_updated = ? WHERE domain = ?", [
- time.time(),
- time.time(),
- domain
- ])
-
- if cursor.rowcount == 0:
- print("WARNING: Did not update any rows:", domain)
-
- except BaseException as e:
- print("ERROR: failed SQL query:", domain, e)
- sys.exit(255)
-
-def update_last_error(domain: str, res: any):
- # NOISY-DEBUG: print("DEBUG: domain,res.status_code:", domain, res.status_code, res.reason)
- try:
- if type(res) is str:
- cursor.execute("UPDATE instances SET last_status_code = 999, last_error_details = ?, last_updated = ? WHERE domain = ?", [
- res,
- time.time(),
- domain
- ])
- else:
- cursor.execute("UPDATE instances SET last_status_code = ?, last_error_details = ?, last_updated = ? WHERE domain = ?", [
- res.status_code,
- res.reason,
- time.time(),
- domain
- ])
-
- if cursor.rowcount == 0:
- # NOISY-DEBUG: print("DEBUG: Did not update any rows:", domain)
- pending_errors[domain] = res
-
- except BaseException as e:
- print("ERROR: failed SQL query:", domain, e)
- sys.exit(255)
-
-def update_last_nodeinfo(domain: str):
- # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
- try:
- cursor.execute("UPDATE instances SET last_nodeinfo = ?, last_updated = ? WHERE domain = ?", [
- time.time(),
- time.time(),
- domain
- ])
-
- if cursor.rowcount == 0:
- print("WARNING: Did not update any rows:", domain)
-
- except BaseException as e:
- print("ERROR: failed SQL query:", domain, e)
- sys.exit(255)
-
- connection.commit()
-
-def get_peers(domain: str) -> list:
- # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
- peers = None
-
- try:
- res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
-
- if not res.ok or res.status_code >= 400:
- print("WARNING: Cannot fetch peers:", domain)
- update_last_error(domain, res)
- else:
- # NOISY-DEBUG: print("DEBUG: Querying API was successful:", domain, len(res.json()))
- peers = res.json()
-
- except:
- print("WARNING: Some error during get():", domain)
-
- update_last_nodeinfo(domain)
-
- # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
- return peers
-
-def post_json_api(domain: str, path: str, data: str) -> list:
- # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
- json = {}
- try:
- res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
-
- if not res.ok or res.status_code >= 400:
- print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
- update_last_error(domain, res)
- raise
-
- update_last_nodeinfo(domain)
- json = res.json()
- except:
- print("WARNING: Some error during post():", domain, path, data)
-
- # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
- return json
-
-def fetch_nodeinfo(domain: str) -> list:
- # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
-
- nodeinfo = fetch_wellknown_nodeinfo(domain)
- # NOISY-DEBUG: print("DEBUG: nodeinfo:", len(nodeinfo))
-
- if len(nodeinfo) > 0:
- # NOISY-DEBUG: print("DEBUG: Returning auto-discovered nodeinfo:", len(nodeinfo))
- return nodeinfo
-
- requests = [
- f"https://{domain}/nodeinfo/2.1.json",
- f"https://{domain}/nodeinfo/2.1",
- f"https://{domain}/nodeinfo/2.0.json",
- f"https://{domain}/nodeinfo/2.0",
- f"https://{domain}/nodeinfo/1.0",
- f"https://{domain}/api/v1/instance"
- ]
-
- json = {}
- for request in requests:
- try:
- # NOISY-DEBUG: print("DEBUG: Fetching request:", request)
- res = reqto.get(request, headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
-
- # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
- if res.ok and res.json() is not None:
- # NOISY-DEBUG: print("DEBUG: Success:", request)
- json = res.json()
- break
- elif not res.ok or res.status_code >= 400:
- print("WARNING: Failed fetching nodeinfo from domain:", domain)
- update_last_error(domain, res)
- continue
-
- except:
- # NOISY-DEBUG: print("DEBUG: Cannot fetch API request:", request)
- pass
-
- # NOISY-DEBUG: print("DEBUG: json[]:", type(json))
- if json is None or len(json) == 0:
- print("WARNING: Failed fetching nodeinfo from domain:", domain)
- update_last_error(domain, "Cannot fetch nodeinfo")
-
- # NOISY-DEBUG: print("DEBUG: Returning json[]:", type(json))
- return json
-
-def fetch_wellknown_nodeinfo(domain: str) -> list:
- # NOISY-DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain)
- json = {}
-
- try:
- res = reqto.get(f"https://{domain}/.well-known/nodeinfo", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
- # NOISY-DEBUG: print("DEBUG: domain,res.ok:", domain, res.ok)
- if res.ok and res.json() is not None:
- nodeinfo = res.json()
- # NOISY-DEBUG: print("DEBUG: Found entries:", len(nodeinfo), domain)
- if "links" in nodeinfo:
- # NOISY-DEBUG: print("DEBUG: Found links in nodeinfo():", len(nodeinfo["links"]))
- for link in nodeinfo["links"]:
- # NOISY-DEBUG: print("DEBUG: rel,href:", link["rel"], link["href"])
- if link["rel"] in nodeinfos:
- # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from:", link["href"])
- res = reqto.get(link["href"])
- # NOISY-DEBUG: print("DEBUG: href,res.ok,res.status_code:", link["href"], res.ok, res.status_code)
- if res.ok and res.json() is not None:
- # NOISY-DEBUG: print("DEBUG: Found JSON nodeinfo():", len(res.json()))
- json = res.json()
- break
- else:
- print("WARNING: Unknown 'rel' value:", domain, link["rel"])
- else:
- print("WARNING: nodeinfo does not contain 'links':", domain)
-
- except:
- print("WARNING: Failed fetching .well-known info:", domain)
- update_last_error(domain, "Cannot fetch .well-known")
- pass
-
- # NOISY-DEBUG: print("DEBUG: Returning json[]:", type(json))
- return json
-
-def determine_software(domain: str) -> str:
- # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
- software = None
-
- json = fetch_nodeinfo(domain)
- # NOISY-DEBUG: print("DEBUG: json[]:", type(json))
-
- if json is None or len(json) == 0:
- # NOISY-DEBUG: print("DEBUG: Could not determine software type:", domain)
- return None
-
- # NOISY-DEBUG: print("DEBUG: json():", len(json), json)
- if "software" not in json or "name" not in json["software"]:
- print("WARNING: JSON response does not include [software][name], guessing ...")
- found = 0
- for element in {"uri", "title", "description", "email", "version", "urls", "stats", "thumbnail", "languages", "contact_account"}:
- if element in json:
- found = found + 1
-
- # NOISY-DEBUG: print("DEBUG: Found elements:", found)
- if found == len(json):
- # NOISY-DEBUG: print("DEBUG: Maybe is Mastodon:", domain)
- return "mastodon"
-
- print("WARNING: Cannot guess software type:", domain, found, len(json))
- return None
-
- software = tidyup(json["software"]["name"])
-
- # NOISY-DEBUG: print("DEBUG: tidyup software:", software)
- if software in ["akkoma", "rebased"]:
- # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, software)
- software = "pleroma"
- elif software in ["hometown", "ecko"]:
- # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, software)
- software = "mastodon"
- elif software in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
- # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, software)
- software = "misskey"
- elif software.find("/") > 0:
- print("WARNING: Spliting of path:", software)
- software = software.split("/")[-1];
- elif software.find("|") > 0:
- print("WARNING: Spliting of path:", software)
- software = software.split("|")[0].strip();
-
- if software == "":
- print("WARNING: tidyup() left no software name behind:", domain)
- software = None
-
- # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
- return software
-
-def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
- # NOISY: # NOISY-DEBUG: print("DEBUG: Updating block reason:", reason, blocker, blocked, block_level)
- try:
- cursor.execute(
- "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
- (
- reason,
- time.time(),
- blocker,
- blocked,
- block_level
- ),
- )
-
- if cursor.rowcount == 0:
- print("WARNING: Did not update any rows:", domain)
-
- except:
- print("ERROR: failed SQL query:", reason, blocker, blocked, block_level)
- sys.exit(255)
-
-def update_last_seen(blocker: str, blocked: str, block_level: str):
- # NOISY: # NOISY-DEBUG: print("DEBUG: Updating last_seen for:", blocker, blocked, block_level)
- try:
- cursor.execute(
- "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
- (
- time.time(),
- blocker,
- blocked,
- block_level
- )
- )
-
- if cursor.rowcount == 0:
- print("WARNING: Did not update any rows:", domain)
-
- except:
- print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level)
- sys.exit(255)
-
-def block_instance(blocker: str, blocked: str, reason: str, block_level: str):
- # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
- if blocker.find("@") > 0:
- print("WARNING: Bad blocker:", blocker)
- raise
- elif blocked.find("@") > 0:
- print("WARNING: Bad blocked:", blocked)
- raise
-
- print("INFO: New block:", blocker, blocked, reason, block_level, first_added, last_seen)
- try:
- cursor.execute(
- "INSERT INTO blocks (blocker, blocked, reason, block_level, first_added, last_seen) VALUES(?, ?, ?, ?, ?, ?)",
- (
- blocker,
- blocked,
- reason,
- block_level,
- time.time(),
- time.time()
- ),
- )
-
- except:
- print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen)
- sys.exit(255)
-
-def add_instance(domain: str, origin: str, originator: str):
- # NOISY-DEBUG: print("DEBUG: domain,origin:", domain, origin, originator)
- if domain.find("@") > 0:
- print("WARNING: Bad domain name:", domain)
- raise
- elif origin is not None and origin.find("@") > 0:
- print("WARNING: Bad origin name:", origin)
- raise
-
- software = determine_software(domain)
- # NOISY-DEBUG: print("DEBUG: Determined software:", software)
-
- print(f"INFO: Adding new instance {domain} (origin: {origin})")
- try:
- cursor.execute(
- "INSERT INTO instances (domain, origin, originator, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
- (
- domain,
- origin,
- originator,
- get_hash(domain),
- software,
- time.time()
- ),
- )
-
- if domain in pending_errors:
- print("DEBUG: domain has pending error be updated:", domain)
- update_last_error(domain, pending_errors[domain])
- del pending_errors[domain]
-
- except BaseException as e:
- print("ERROR: failed SQL query:", domain, e)
- sys.exit(255)
- else:
- # NOISY-DEBUG: print("DEBUG: Updating nodeinfo for domain:", domain)
- update_last_nodeinfo(domain)
-
-def send_bot_post(instance: str, blocks: dict):
- message = instance + " has blocked the following instances:\n\n"
- truncated = False
-
- if len(blocks) > 20:
- truncated = True
- blocks = blocks[0 : 19]
-
- for block in blocks:
- if block["reason"] == None or block["reason"] == '':
- message = message + block["blocked"] + " with unspecified reason\n"
- else:
- if len(block["reason"]) > 420:
- block["reason"] = block["reason"][0:419] + "[…]"
-
- message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
-
- if truncated:
- message = message + "(the list has been truncated to the first 20 entries)"
-
- botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
-
- req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
- data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
- headers=botheaders, timeout=10).json()
-
- return True
-
-def get_mastodon_blocks(domain: str) -> dict:
- # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
- blocks = {
- "Suspended servers": [],
- "Filtered media" : [],
- "Limited servers" : [],
- "Silenced servers" : [],
- }
-
- translations = {
- "Silenced instances" : "Silenced servers",
- "Suspended instances" : "Suspended servers",
- "Gesperrte Server" : "Suspended servers",
- "Gefilterte Medien" : "Filtered media",
- "Stummgeschaltete Server" : "Silenced servers",
- "停止済みのサーバー" : "Suspended servers",
- "メディアを拒否しているサーバー": "Filtered media",
- "サイレンス済みのサーバー" : "Silenced servers",
- "שרתים מושעים" : "Suspended servers",
- "מדיה מסוננת" : "Filtered media",
- "שרתים מוגבלים" : "Silenced servers",
- "Serveurs suspendus" : "Suspended servers",
- "Médias filtrés" : "Filtered media",
- "Serveurs limités" : "Silenced servers",
- }
-
- try:
- doc = BeautifulSoup(
- reqto.get(f"https://{domain}/about/more", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
- "html.parser",
- )
- except:
- print("ERROR: Cannot fetch from domain:", domain)
- return {}
-
- for header in doc.find_all("h3"):
- header_text = header.text
-
- if header_text in translations:
- header_text = translations[header_text]
-
- if header_text in blocks:
- # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
- for line in header.find_all_next("table")[0].find_all("tr")[1:]:
- blocks[header_text].append(
- {
- "domain": tidyup(line.find("span").text),
- "hash" : tidyup(line.find("span")["title"][9:]),
- "reason": tidyup(line.find_all("td")[1].text),
- }
- )
-
- # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
- return {
- "reject" : blocks["Suspended servers"],
- "media_removal" : blocks["Filtered media"],
- "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
- }
-
-def get_friendica_blocks(domain: str) -> dict:
- # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
- blocks = []
-
- try:
- doc = BeautifulSoup(
- reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
- "html.parser",
- )
- except:
- print("WARNING: Failed to fetch /friendica from domain:", domain)
- return {}
-
- blocklist = doc.find(id="about_blocklist")
-
- # Prevents exceptions:
- if blocklist is None:
- # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
- return {}
-
- for line in blocklist.find("table").find_all("tr")[1:]:
- blocks.append({
- "domain": tidyup(line.find_all("td")[0].text),
- "reason": tidyup(line.find_all("td")[1].text)
- })
-
- # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
- return {
- "reject": blocks
- }
-
-def get_misskey_blocks(domain: str) -> dict:
- # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
- blocks = {
- "suspended": [],
- "blocked" : []
- }
-
- try:
- counter = 0
- step = 99
- while True:
- # iterating through all "suspended" (follow-only in its terminology)
- # instances page-by-page, since that troonware doesn't support
- # sending them all at once
- try:
- if counter == 0:
- # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
- doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
- "sort" : "+caughtAt",
- "host" : None,
- "suspended": True,
- "limit" : step
- }))
- else:
- # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
- doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
- "sort" : "+caughtAt",
- "host" : None,
- "suspended": True,
- "limit" : step,
- "offset" : counter-1
- }))
-
- # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
- if len(doc) == 0:
- # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
- break
-
- for instance in doc:
- # just in case
- if instance["isSuspended"]:
- blocks["suspended"].append(
- {
- "domain": tidyup(instance["host"]),
- # no reason field, nothing
- "reason": ""
- }
- )
-
- if len(doc) < step:
- # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
- break
-
- # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
- counter = counter + step
-
- except:
- print("WARNING: Caught error, exiting loop:", domain)
- counter = 0
- break
-
- while True:
- # same shit, different asshole ("blocked" aka full suspend)
- try:
- if counter == 0:
- # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
- doc = post_json_api(domain,"/api/federation/instances", json.dumps({
- "sort" : "+caughtAt",
- "host" : None,
- "blocked": True,
- "limit" : step
- }))
- else:
- # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
- doc = post_json_api(domain,"/api/federation/instances", json.dumps({
- "sort" : "+caughtAt",
- "host" : None,
- "blocked": True,
- "limit" : step,
- "offset" : counter-1
- }))
-
- # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
- if len(doc) == 0:
- # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
- break
-
- for instance in doc:
- if instance["isBlocked"]:
- blocks["blocked"].append({
- "domain": tidyup(instance["host"]),
- "reason": ""
- })
-
- if len(doc) < step:
- # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
- break
-
- # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
- counter = counter + step
-
- except:
- counter = 0
- break
-
- # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
- return {
- "reject" : blocks["blocked"],
- "followers_only": blocks["suspended"]
- }
-
- except:
- print("WARNING: API request failed for domain:", domain)
- return {}
-
-def tidyup(string: str) -> str:
- # some retards put their blocks in variable case
- string = string.lower().strip()
-
- # other retards put the port
- string = re.sub("\:\d+$", "", string)
-
- # bigger retards put the schema in their blocklist, sometimes even without slashes
- string = re.sub("^https?\:(\/*)", "", string)
-
- # and trailing slash
- string = re.sub("\/$", "", string)
-
- # and the @
- string = re.sub("^\@", "", string)
-
- # the biggest retards of them all try to block individual users
- string = re.sub("(.+)\@", "", string)
-
- return string
+# Exit with status code
+sys.exit(status)