-#!/usr/bin/python3
+#!venv/bin/python3
# -*- coding: utf-8 -*-
# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
-import re
-
from datetime import datetime
from email.utils import format_datetime
from pathlib import Path
+import re
+import validators
+
import fastapi
from fastapi import Request, HTTPException, Query
from fastapi.responses import JSONResponse
from fba.models import blocks
from fba.models import instances
-router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
+# Base URL for all our own requests
+_base_url = config.get("base_url")
+
+router = fastapi.FastAPI(docs_url=_base_url + "/docs", redoc_url=_base_url + "/redoc")
router.mount(
"/static",
StaticFiles(directory=Path(__file__).parent.absolute() / "static"),
templates = Jinja2Templates(directory="templates")
-@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
-def api_info():
- database.cursor.execute("SELECT (SELECT COUNT(domain) AS total_websites FROM instances), (SELECT COUNT(domain) AS supported_instances FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube', 'takahe', 'gotosocial', 'brighteon', 'wildebeest', 'bookwyrm', 'mitra', 'areionskey', 'mammuthus', 'neodb', 'smithereen', 'vebinet')), (SELECT COUNT(blocker) FROM blocks) AS total_blocks, (SELECT COUNT(domain) AS erroneous_instances FROM instances WHERE last_error_details IS NOT NULL)")
+@router.get(_base_url + "/api/info.json", response_class=JSONResponse)
+def api_info() -> None:
+ database.cursor.execute("SELECT ( \
+SELECT COUNT(domain) FROM instances) AS total_websites, \
+(SELECT COUNT(domain) FROM instances WHERE software IN ( \
+ 'pleroma' , 'mastodon', 'lemmy' , 'friendica', 'misskey' , \
+ 'peertube' , 'takahe' , 'gotosocial', 'brighteon', 'wildebeest' , \
+ 'bookwyrm' , 'mitra' , 'areionskey', 'mammuthus', 'neodb' , \
+ 'smithereen', 'vebinet' , 'toki' , 'snac' , 'biblioreads', \
+ 'wordpress' , 'oolong' , 'diaspora' , 'appy' , 'neko' \
+)) AS supported_instances, \
+(SELECT COUNT(blocker) FROM blocks) AS total_blocks, \
+(SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL) AS erroneous_instances")
+
row = database.cursor.fetchone()
- return JSONResponse(status_code=200, content=tuple(row))
+ return JSONResponse(status_code=200, content={
+ "total_websites" : row["total_websites"],
+ "supported_instances": row["supported_instances"],
+ "total_blocks" : row["total_blocks"],
+ "erroneous_instances": row["erroneous_instances"],
+ })
+
-@router.get(config.get("base_url") + "/api/scoreboard.json", response_class=JSONResponse)
-def api_scoreboard(mode: str, amount: int):
- if amount > config.get("api_limit"):
+@router.get(_base_url + "/api/scoreboard.json", response_class=JSONResponse)
+def api_scoreboard(mode: str, amount: int) -> None:
+ if mode is None or amount is None:
+ raise HTTPException(status_code=500, detail="No filter specified")
+ elif amount <= 0:
+ raise HTTPException(status_code=500, detail=f"amount={amount} is to small")
+ elif amount > config.get("api_limit"):
raise HTTPException(status_code=400, detail="Too many results")
if mode == "blocked":
- database.cursor.execute("SELECT blocked, COUNT(blocked) AS score FROM blocks GROUP BY blocked ORDER BY score DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT blocked AS data, COUNT(blocked) AS score FROM blocks GROUP BY blocked ORDER BY score DESC LIMIT ?", [amount])
elif mode == "blocker":
- database.cursor.execute("SELECT blocker, COUNT(blocker) AS score FROM blocks GROUP BY blocker ORDER BY score DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT blocker AS data, COUNT(blocker) AS score FROM blocks GROUP BY blocker ORDER BY score DESC LIMIT ?", [amount])
elif mode == "reference":
- database.cursor.execute("SELECT origin, COUNT(domain) AS score FROM instances WHERE origin IS NOT NULL GROUP BY origin ORDER BY score DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT origin AS data, COUNT(domain) AS score FROM instances WHERE origin IS NOT NULL GROUP BY origin ORDER BY score DESC LIMIT ?", [amount])
elif mode == "original_software":
- database.cursor.execute("SELECT original_software, COUNT(domain) AS score FROM instances WHERE original_software IS NOT NULL GROUP BY original_software ORDER BY score DESC, original_software ASC LIMIT ?", [amount])
+ database.cursor.execute("SELECT original_software AS data, COUNT(domain) AS score FROM instances WHERE original_software IS NOT NULL GROUP BY original_software ORDER BY score DESC, original_software ASC LIMIT ?", [amount])
elif mode == "software":
- database.cursor.execute("SELECT software, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY score DESC, software ASC LIMIT ?", [amount])
+ database.cursor.execute("SELECT software AS data, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY score DESC, software ASC LIMIT ?", [amount])
elif mode == "command":
- database.cursor.execute("SELECT command, COUNT(domain) AS score FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY score DESC, command ASC LIMIT ?", [amount])
+ database.cursor.execute("SELECT command AS data, COUNT(domain) AS score FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY score DESC, command ASC LIMIT ?", [amount])
elif mode == "error_code":
- database.cursor.execute("SELECT last_status_code, COUNT(domain) AS score FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY score DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT last_status_code AS data, COUNT(domain) AS score FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY score DESC LIMIT ?", [amount])
elif mode == "detection_mode":
- database.cursor.execute("SELECT detection_mode, COUNT(domain) AS cnt FROM instances GROUP BY detection_mode ORDER BY cnt DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT detection_mode AS data, COUNT(domain) AS score FROM instances GROUP BY detection_mode ORDER BY score DESC LIMIT ?", [amount])
elif mode == "avg_peers":
- database.cursor.execute("SELECT software, AVG(total_peers) AS average FROM instances WHERE software IS NOT NULL AND total_peers IS NOT NULL GROUP BY software HAVING average > 0 ORDER BY average DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT software AS data, AVG(total_peers) AS score FROM instances WHERE software IS NOT NULL AND total_peers IS NOT NULL GROUP BY software HAVING score > 0 ORDER BY score DESC LIMIT ?", [amount])
elif mode == "avg_blocks":
- database.cursor.execute("SELECT software, AVG(total_blocks) AS average FROM instances WHERE software IS NOT NULL AND total_blocks IS NOT NULL GROUP BY software HAVING average > 0 ORDER BY average DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT software AS data, AVG(total_blocks) AS score FROM instances WHERE software IS NOT NULL AND total_blocks IS NOT NULL GROUP BY software HAVING score > 0 ORDER BY score DESC LIMIT ?", [amount])
elif mode == "obfuscator":
- database.cursor.execute("SELECT software, COUNT(domain) AS cnt FROM instances WHERE has_obfuscation = 1 GROUP BY software ORDER BY cnt DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT software AS data, COUNT(domain) AS score FROM instances WHERE has_obfuscation = 1 GROUP BY software ORDER BY score DESC LIMIT ?", [amount])
elif mode == "obfuscation":
- database.cursor.execute("SELECT has_obfuscation, COUNT(domain) AS cnt FROM instances WHERE software IN ('pleroma', 'lemmy', 'mastodon', 'misskey', 'friendica') GROUP BY has_obfuscation ORDER BY cnt DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT has_obfuscation AS data, COUNT(domain) AS score FROM instances WHERE has_obfuscation = 1 GROUP BY has_obfuscation ORDER BY score DESC LIMIT ?", [amount])
elif mode == "block_level":
- database.cursor.execute("SELECT block_level, COUNT(rowid) AS cnt FROM blocks GROUP BY block_level ORDER BY cnt DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT block_level AS data, COUNT(rowid) AS score FROM blocks GROUP BY block_level ORDER BY score DESC LIMIT ?", [amount])
else:
- raise HTTPException(status_code=400, detail="No filter specified")
+ raise HTTPException(status_code=400, detail=f"mode='{mode}' is not supported")
- scores = list()
+ scores = []
for row in database.cursor.fetchall():
scores.append({
- "domain": row[0],
- "score" : round(row[1]),
+ "data" : row["data"],
+ "score": round(row["score"]),
})
return JSONResponse(status_code=200, content=scores)
-@router.get(config.get("base_url") + "/api/list.json", response_class=JSONResponse)
-def api_list(request: Request, mode: str, value: str, amount: int):
+@router.get(_base_url + "/api/list.json", response_class=JSONResponse)
+def api_list(request: Request, mode: str, value: str, amount: int) -> None:
if mode is None or value is None or amount is None:
raise HTTPException(status_code=500, detail="No filter specified")
+ elif amount <= 0:
+ raise HTTPException(status_code=500, detail=f"amount={amount} is to small")
elif amount > config.get("api_limit"):
raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
- if mode in ("detection_mode", "original_software", "software", "command", "origin"):
+ if mode in ["detection_mode", "original_software", "software", "command", "origin"]:
database.cursor.execute(
f"SELECT * \
FROM instances \
ORDER BY domain \
LIMIT ?", [value, amount]
)
- elif mode == "recently":
+ elif mode == "added":
database.cursor.execute(
- f"SELECT * \
+ "SELECT * \
FROM instances \
ORDER BY first_seen DESC \
+LIMIT ?", [amount]
+ )
+ elif mode == "updated":
+ database.cursor.execute(
+ "SELECT * \
+FROM instances \
+ORDER BY last_updated DESC \
LIMIT ?", [amount]
)
else:
domainlist = database.cursor.fetchall()
return domainlist
-@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
-def api_index(request: Request, mode: str, value: str, amount: int):
+@router.get(_base_url + "/api/top.json", response_class=JSONResponse)
+def api_top(request: Request, mode: str, value: str, amount: int) -> None:
if mode is None or value is None or amount is None:
raise HTTPException(status_code=500, detail="No filter specified")
+ elif amount <= 0:
+ raise HTTPException(status_code=500, detail=f"amount={amount} is to small")
elif amount > config.get("api_limit"):
raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
domain,
"*." + domain,
wildchar,
- utils.get_hash(domain),
+ utils.hash_domain(domain),
punycode,
"*." + punycode,
amount
domain,
"*." + domain,
wildchar,
- utils.get_hash(domain),
+ utils.hash_domain(domain),
punycode,
"*." + punycode,
amount
amount
])
- blocklist = database.cursor.fetchall()
+ rows = database.cursor.fetchall()
result = {}
- for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
- if reason is not None and reason != "":
- reason = reason.replace(",", " ").replace(" ", " ")
-
- entry = {
- "blocker" : blocker,
- "blocked" : blocked,
- "reason" : reason,
- "first_seen": first_seen,
- "last_seen" : last_seen
- }
-
- if block_level in result:
- result[block_level].append(entry)
+ for row in list(rows):
+ entry = dict(row)
+ entry["reason"] = entry["reason"].replace(",", " ").replace(" ", " ") if entry["reason"] not in [None, ""] else None
+
+ if entry["block_level"] in result:
+ result[entry["block_level"]].append(entry)
else:
- result[block_level] = [entry]
+ result[entry["block_level"]] = [entry]
return result
-@router.get(config.get("base_url") + "/api/domain.json", response_class=JSONResponse)
-def api_domain(domain: str):
+@router.get(_base_url + "/api/domain.json", response_class=JSONResponse)
+def api_domain(domain: str) -> None:
if domain is None:
raise HTTPException(status_code=400, detail="Invalid request, parameter 'domain' missing")
return JSONResponse(status_code=200, content=dict(domain_data))
-@router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
-def api_mutual(domains: list[str] = Query()):
+@router.get(_base_url + "/api/mutual.json", response_class=JSONResponse)
+def api_mutual(domains: list[str] = Query()) -> None:
"""Return 200 if federation is open between the two, 4xx otherwise"""
- database.cursor.execute(
- "SELECT block_level FROM blocks " \
- "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
- "AND block_level = 'reject' " \
- "LIMIT 1",
+ database.cursor.execute("SELECT block_level FROM blocks \
+WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) \
+AND block_level = 'reject' \
+LIMIT 1",
{
"a" : domains[0],
"b" : domains[1],
# No known blocks
return JSONResponse(status_code=200, content={})
-@router.get(config.get("base_url") + "/.well-known/nodeinfo", response_class=JSONResponse)
-def wellknown_nodeinfo(request: Request):
+@router.get(_base_url + "/.well-known/nodeinfo", response_class=JSONResponse)
+def wellknown_nodeinfo(request: Request) -> None:
return JSONResponse(status_code=200, content={
"links": ({
"rel" : "http://nodeinfo.diaspora.software/ns/schema/1.0",
})
})
-@router.get(config.get("base_url") + "/nodeinfo/1.0", response_class=JSONResponse)
-def nodeinfo_1_0(request: Request):
+@router.get(_base_url + "/nodeinfo/1.0", response_class=JSONResponse)
+def nodeinfo_1_0(request: Request) -> None:
return JSONResponse(status_code=200, content={
"version": "1.0",
"software": {
},
})
-@router.get(config.get("base_url") + "/api/v1/instance/domain_blocks", response_class=JSONResponse)
-def api_domain_blocks(request: Request):
+@router.get(_base_url + "/api/v1/instance/domain_blocks", response_class=JSONResponse)
+def api_domain_blocks(request: Request) -> None:
+ blocking = []
blocked = blacklist.get_all()
- blocking = list()
for block in blocked:
blocking.append({
"domain" : block,
- "digest" : utils.get_hash(block),
+ "digest" : utils.hash_domain(block) if validators.domain(block, rfc_2782=True) else None,
"severity": "suspend",
"comment" : blocked[block],
})
return JSONResponse(status_code=200, content=blocking)
-@router.get(config.get("base_url") + "/api/v1/instance/peers", response_class=JSONResponse)
-def api_peers(request: Request):
+@router.get(_base_url + "/api/v1/instance/peers", response_class=JSONResponse)
+def api_peers(request: Request) -> None:
database.cursor.execute("SELECT domain FROM instances WHERE nodeinfo_url IS NOT NULL")
- peers = list()
+ peers = []
for row in database.cursor.fetchall():
peers.append(row["domain"])
return JSONResponse(status_code=200, content=peers)
-@router.get(config.get("base_url") + "/scoreboard")
-def scoreboard(request: Request, mode: str, amount: int):
+@router.get(_base_url + "/scoreboard")
+def scoreboard(request: Request, mode: str, amount: int) -> None:
if mode == "":
raise HTTPException(status_code=400, detail="No mode specified")
elif amount <= 0:
raise HTTPException(status_code=500, detail="Invalid amount specified")
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode={mode}&amount={amount}")
+ response = requests.get(
+ f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode={mode}&amount={amount}",
+ timeout=config.timeout
+ )
if response is None:
raise HTTPException(status_code=500, detail="Could not determine scores")
"scores" : json_helper.from_response(response)
})
-@router.get(config.get("base_url") + "/list")
-def list_domains(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
+@router.get(_base_url + "/list")
+def list_domains(request: Request, mode: str, value: str, amount: int = config.get("api_limit")) -> None:
if mode == "detection_mode" and not instances.valid(value, "detection_mode"):
raise HTTPException(status_code=500, detail="Invalid detection mode provided")
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/list.json?mode={mode}&value={value}&amount={amount}")
+ response = requests.get(
+ f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/list.json?mode={mode}&value={value}&amount={amount}",
+ timeout=config.timeout
+ )
- domainlist = list()
+ domainlist = []
if response is not None and response.ok:
domainlist = response.json()
tformat = config.get("timestamp_format")
"theme" : config.get("theme"),
})
-@router.get(config.get("base_url") + "/top")
-def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
+@router.get(_base_url + "/top")
+def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")) -> None:
if mode == "block_level" and not blocks.valid(value, "block_level"):
raise HTTPException(status_code=500, detail="Invalid block level provided")
elif mode in ["domain", "reverse"] and not domain_helper.is_wanted(value):
raise HTTPException(status_code=500, detail="Invalid or blocked domain specified")
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}")
+ response = requests.get(
+ f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}",
+ timeout=config.timeout
+ )
found = 0
- blocklist = list()
+ blocklist = []
if response.ok and response.status_code == 200 and len(response.text) > 0:
blocklist = response.json()
"theme" : config.get("theme"),
})
-@router.get(config.get("base_url") + "/infos")
-def infos(request: Request, domain: str):
+@router.get(_base_url + "/infos")
+def infos(request: Request, domain: str) -> None:
if domain is None:
raise HTTPException(status_code=400, detail="Invalid request, parameter 'domain' missing")
if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/domain.json?domain={domain}")
+ response = requests.get(
+ f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/domain.json?domain={domain}",
+ timeout=config.timeout
+ )
if not response.ok or response.status_code > 200 or response.text.strip() == "":
raise HTTPException(status_code=response.status_code, detail=response.reason)
# Format timestamps
tformat = config.get("timestamp_format")
- instance = dict()
+ instance = {}
for key in domain_data.keys():
if key in ["last_nodeinfo", "last_blocked", "first_seen", "last_updated", "last_instance_fetch"] and isinstance(domain_data[key], float):
# Timestamps
"slogan" : config.get("slogan"),
})
-@router.get(config.get("base_url") + "/rss")
-def rss(request: Request, domain: str = None):
+@router.get(_base_url + "/rss")
+def rss(request: Request, domain: str = None) -> None:
if domain is not None:
domain = tidyup.domain(domain).encode("idna").decode("utf-8")
LIMIT ?", [
domain,
"*." + domain, wildchar,
- utils.get_hash(domain),
+ utils.hash_domain(domain),
punycode,
"*." + punycode,
config.get("rss_limit")
for row in result:
blocklist.append({
- "blocker" : row[0],
- "blocked" : row[1],
- "block_level": row[2],
- "reason" : "Provided reason: '" + row[3] + "'" if row[3] is not None and row[3] != "" else "No reason provided.",
- "first_seen" : format_datetime(datetime.fromtimestamp(row[4])),
- "last_seen" : format_datetime(datetime.fromtimestamp(row[5])),
+ "blocker" : row["blocker"],
+ "blocked" : row["blocked"],
+ "block_level": row["block_level"],
+ "reason" : "Provided reason: '" + row["reason"] + "'" if row["reason"] is not None and row["reason"] != "" else "No reason provided.",
+ "first_seen" : format_datetime(datetime.fromtimestamp(row["first_seen"])),
+ "last_seen" : format_datetime(datetime.fromtimestamp(row["last_seen"])),
})
return templates.TemplateResponse("views/rss.xml", {
"Content-Type": "routerlication/rss+xml"
})
-@router.get(config.get("base_url") + "/robots.txt", response_class=PlainTextResponse)
-def robots(request: Request):
+@router.get(_base_url + "/robots.txt", response_class=PlainTextResponse)
+def robots(request: Request) -> None:
return templates.TemplateResponse("views/robots.txt", {
"request" : request,
- "base_url": config.get("base_url")
+ "base_url": _base_url
})
-@router.get(config.get("base_url") + "/")
-def index(request: Request):
+@router.get(_base_url + "/")
+def index(request: Request) -> None:
# Get info
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
+ response = requests.get(
+ f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json",
+ timeout=config.timeout
+ )
if not response.ok:
raise HTTPException(status_code=response.status_code, detail=response.text)