-#!/usr/bin/python3
+#!venv/bin/python3
# -*- coding: utf-8 -*-
# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
-import uvicorn
import requests
+import uvicorn
from fba import database
from fba import utils
+from fba.helpers import blacklist
from fba.helpers import config
+from fba.helpers import domain as domain_helper
from fba.helpers import json as json_helper
from fba.helpers import tidyup
from fba.models import blocks
+from fba.models import instances
router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
router.mount(
@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
def api_info():
- database.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube', 'takahe')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL)")
+ database.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances) AS total_websites, (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube', 'takahe', 'gotosocial', 'brighteon', 'wildebeest', 'bookwyrm', 'mitra', 'areionskey', 'mammuthus', 'neodb', 'smithereen', 'vebinet', 'hugo', 'toki', 'snac')) AS supported_instances, (SELECT COUNT(blocker) FROM blocks) AS total_blocks, (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL) AS erroneous_instances")
+
row = database.cursor.fetchone()
- return {
- "known_instances" : row[0],
- "supported_instances": row[1],
- "blocks_recorded" : row[2],
- "erroneous_instances": row[3],
- }
+ return JSONResponse(status_code=200, content={
+ "total_websites" : row["total_websites"],
+ "supported_instances": row["supported_instances"],
+ "total_blocks" : row["total_blocks"],
+ "erroneous_instances": row["erroneous_instances"],
+ })
+
@router.get(config.get("base_url") + "/api/scoreboard.json", response_class=JSONResponse)
def api_scoreboard(mode: str, amount: int):
database.cursor.execute("SELECT blocker, COUNT(blocker) AS score FROM blocks GROUP BY blocker ORDER BY score DESC LIMIT ?", [amount])
elif mode == "reference":
database.cursor.execute("SELECT origin, COUNT(domain) AS score FROM instances WHERE origin IS NOT NULL GROUP BY origin ORDER BY score DESC LIMIT ?", [amount])
+ elif mode == "original_software":
+ database.cursor.execute("SELECT original_software, COUNT(domain) AS score FROM instances WHERE original_software IS NOT NULL GROUP BY original_software ORDER BY score DESC, original_software ASC LIMIT ?", [amount])
elif mode == "software":
database.cursor.execute("SELECT software, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY score DESC, software ASC LIMIT ?", [amount])
elif mode == "command":
elif mode == "detection_mode":
database.cursor.execute("SELECT detection_mode, COUNT(domain) AS cnt FROM instances GROUP BY detection_mode ORDER BY cnt DESC LIMIT ?", [amount])
elif mode == "avg_peers":
- database.cursor.execute("SELECT software, AVG(total_peers) AS average FROM instances WHERE software IS NOT NULL GROUP BY software HAVING average>0 ORDER BY average DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT software, AVG(total_peers) AS average FROM instances WHERE software IS NOT NULL AND total_peers IS NOT NULL GROUP BY software HAVING average > 0 ORDER BY average DESC LIMIT ?", [amount])
+ elif mode == "avg_blocks":
+ database.cursor.execute("SELECT software, AVG(total_blocks) AS average FROM instances WHERE software IS NOT NULL AND total_blocks IS NOT NULL GROUP BY software HAVING average > 0 ORDER BY average DESC LIMIT ?", [amount])
elif mode == "obfuscator":
database.cursor.execute("SELECT software, COUNT(domain) AS cnt FROM instances WHERE has_obfuscation = 1 GROUP BY software ORDER BY cnt DESC LIMIT ?", [amount])
elif mode == "obfuscation":
- database.cursor.execute("SELECT has_obfuscation, COUNT(domain) AS cnt FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica') GROUP BY has_obfuscation ORDER BY cnt DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT has_obfuscation, COUNT(domain) AS cnt FROM instances WHERE software IN ('pleroma', 'lemmy', 'mastodon', 'misskey', 'friendica') GROUP BY has_obfuscation ORDER BY cnt DESC LIMIT ?", [amount])
elif mode == "block_level":
database.cursor.execute("SELECT block_level, COUNT(rowid) AS cnt FROM blocks GROUP BY block_level ORDER BY cnt DESC LIMIT ?", [amount])
else:
scores = list()
- for domain, score in database.cursor.fetchall():
+ for row in database.cursor.fetchall():
scores.append({
- "domain": domain,
- "score" : round(score)
+ "domain": row[0],
+ "score" : round(row[1]),
})
- return scores
+ return JSONResponse(status_code=200, content=scores)
+
+@router.get(config.get("base_url") + "/api/list.json", response_class=JSONResponse)
+def api_list(request: Request, mode: str, value: str, amount: int):
+ if mode is None or value is None or amount is None:
+ raise HTTPException(status_code=500, detail="No filter specified")
+ elif amount > config.get("api_limit"):
+ raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
+
+ if mode in ("detection_mode", "original_software", "software", "command", "origin"):
+ database.cursor.execute(
+ f"SELECT * \
+FROM instances \
+WHERE {mode} = ? \
+ORDER BY domain \
+LIMIT ?", [value, amount]
+ )
+ elif mode == "recently":
+ database.cursor.execute(
+ f"SELECT * \
+FROM instances \
+ORDER BY first_seen DESC \
+LIMIT ?", [amount]
+ )
+ else:
+ raise HTTPException(status_code=500, detail=f"mode='{mode}' is unsupported")
+
+ domainlist = database.cursor.fetchall()
+ return domainlist
@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
def api_index(request: Request, mode: str, value: str, amount: int):
)
elif mode in ["domain", "reverse"]:
domain = tidyup.domain(value)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
return result
+@router.get(config.get("base_url") + "/api/domain.json", response_class=JSONResponse)
+def api_domain(domain: str):
+ if domain is None:
+ raise HTTPException(status_code=400, detail="Invalid request, parameter 'domain' missing")
+
+ # Tidy up domain name
+ domain = tidyup.domain(domain).encode("idna").decode("utf-8")
+
+ if not domain_helper.is_wanted(domain):
+ raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
+
+ # Fetch domain data
+ database.cursor.execute("SELECT * FROM instances WHERE domain = ? LIMIT 1", [domain])
+ domain_data = database.cursor.fetchone()
+
+ if domain_data is None:
+ raise HTTPException(status_code=404, detail=f"domain='{domain}' not found")
+
+ return JSONResponse(status_code=200, content=dict(domain_data))
+
@router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
def api_mutual(domains: list[str] = Query()):
"""Return 200 if federation is open between the two, 4xx otherwise"""
"bw": "*." + domains[1],
},
)
- response = database.cursor.fetchone()
- if response is not None:
+ if database.cursor.fetchone() is not None:
# Blocks found
return JSONResponse(status_code=418, content={})
# No known blocks
return JSONResponse(status_code=200, content={})
+@router.get(config.get("base_url") + "/.well-known/nodeinfo", response_class=JSONResponse)
+def wellknown_nodeinfo(request: Request):
+ return JSONResponse(status_code=200, content={
+ "links": ({
+ "rel" : "http://nodeinfo.diaspora.software/ns/schema/1.0",
+ "href": f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}/nodeinfo/1.0"
+ })
+ })
+
+@router.get(config.get("base_url") + "/nodeinfo/1.0", response_class=JSONResponse)
+def nodeinfo_1_0(request: Request):
+ return JSONResponse(status_code=200, content={
+ "version": "1.0",
+ "software": {
+ "name": "FBA",
+ "version": "0.1",
+ },
+ "protocols": {
+ "inbound": (),
+ "outbound": (
+ "rss",
+ ),
+ },
+ "services": {
+ "inbound": (),
+ "outbound": (
+ "rss",
+ ),
+ },
+ "usage": {
+ "users": {},
+ },
+ "openRegistrations": False,
+ "metadata": {
+ "nodeName": "Fedi Block API",
+ "protocols": {
+ "inbound": (),
+ "outbound": (
+ "rss",
+ ),
+ },
+ "services": {
+ "inbound": (),
+ "outbound": (
+ "rss",
+ ),
+ },
+ "explicitContent": False,
+ },
+ })
+
+@router.get(config.get("base_url") + "/api/v1/instance/domain_blocks", response_class=JSONResponse)
+def api_domain_blocks(request: Request):
+ blocked = blacklist.get_all()
+ blocking = list()
+
+ for block in blocked:
+ blocking.append({
+ "domain" : block,
+ "digest" : utils.get_hash(block),
+ "severity": "suspend",
+ "comment" : blocked[block],
+ })
+
+ return JSONResponse(status_code=200, content=blocking)
+
+@router.get(config.get("base_url") + "/api/v1/instance/peers", response_class=JSONResponse)
+def api_peers(request: Request):
+ database.cursor.execute("SELECT domain FROM instances WHERE nodeinfo_url IS NOT NULL")
+
+ peers = list()
+ for row in database.cursor.fetchall():
+ peers.append(row["domain"])
+
+ return JSONResponse(status_code=200, content=peers)
+
@router.get(config.get("base_url") + "/scoreboard")
def scoreboard(request: Request, mode: str, amount: int):
- response = None
-
- if mode == "blocker" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=blocker&amount={amount}")
- elif mode == "blocked" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=blocked&amount={amount}")
- elif mode == "reference" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=reference&amount={amount}")
- elif mode == "software" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=software&amount={amount}")
- elif mode == "command" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=command&amount={amount}")
- elif mode == "error_code" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=error_code&amount={amount}")
- elif mode == "detection_mode" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=detection_mode&amount={amount}")
- elif mode == "avg_peers" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=avg_peers&amount={amount}")
- elif mode == "obfuscator" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=obfuscator&amount={amount}")
- elif mode == "obfuscation" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=obfuscation&amount={amount}")
- elif mode == "block_level" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=block_level&amount={amount}")
- else:
- raise HTTPException(status_code=400, detail="No filter specified")
+ if mode == "":
+ raise HTTPException(status_code=400, detail="No mode specified")
+ elif amount <= 0:
+ raise HTTPException(status_code=500, detail="Invalid amount specified")
+
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode={mode}&amount={amount}")
if response is None:
raise HTTPException(status_code=500, detail="Could not determine scores")
raise HTTPException(status_code=response.status_code, detail=response.text)
return templates.TemplateResponse("views/scoreboard.html", {
- "base_url" : config.get("base_url"),
+ "base_url" : utils.base_url(),
"slogan" : config.get("slogan"),
"theme" : config.get("theme"),
"request" : request,
"scores" : json_helper.from_response(response)
})
-@router.get(config.get("base_url") + "/")
-def index(request: Request):
- # Get info
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
+@router.get(config.get("base_url") + "/list")
+def list_domains(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
+ if mode == "detection_mode" and not instances.valid(value, "detection_mode"):
+ raise HTTPException(status_code=500, detail="Invalid detection mode provided")
- if not response.ok:
- raise HTTPException(status_code=response.status_code, detail=response.text)
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/list.json?mode={mode}&value={value}&amount={amount}")
- return templates.TemplateResponse("views/index.html", {
- "request": request,
- "theme" : config.get("theme"),
- "info" : response.json(),
- "slogan" : config.get("slogan"),
+ domainlist = list()
+ if response is not None and response.ok:
+ domainlist = response.json()
+ tformat = config.get("timestamp_format")
+ for row in domainlist:
+ row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(tformat)
+ row["last_updated"] = datetime.utcfromtimestamp(row["last_updated"]).strftime(tformat) if isinstance(row["last_updated"], float) else None
+
+ return templates.TemplateResponse("views/list.html", {
+ "request" : request,
+ "mode" : mode if response is not None else None,
+ "value" : value if response is not None else None,
+ "amount" : amount if response is not None else None,
+ "found" : len(domainlist),
+ "domainlist": domainlist,
+ "slogan" : config.get("slogan"),
+ "theme" : config.get("theme"),
})
@router.get(config.get("base_url") + "/top")
def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
-
- if not response.ok:
- raise HTTPException(status_code=response.status_code, detail=response.text)
- elif mode == "" or value == "" or amount == 0:
- raise HTTPException(status_code=500, detail="Parameter mode, value and amount must always be set")
- elif amount > config.get("api_limit"):
- raise HTTPException(status_code=500, detail=f"amount='{amount}' is to big")
-
- info = response.json()
- response = None
- blocklist = list()
-
- if mode == "block_level" and not blocks.is_valid_level(value):
+ if mode == "block_level" and not blocks.valid(value, "block_level"):
raise HTTPException(status_code=500, detail="Invalid block level provided")
- elif mode in ["domain", "reverse"] and not utils.is_domain_wanted(value):
+ elif mode in ["domain", "reverse"] and not domain_helper.is_wanted(value):
raise HTTPException(status_code=500, detail="Invalid or blocked domain specified")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}")
- if response is not None:
+ found = 0
+ blocklist = list()
+ if response.ok and response.status_code == 200 and len(response.text) > 0:
blocklist = response.json()
- found = 0
- for block_level in blocklist:
- for block in blocklist[block_level]:
- block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
- block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
- found = found + 1
+ tformat = config.get("timestamp_format")
+ for block_level in blocklist:
+ for row in blocklist[block_level]:
+ row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(tformat)
+ row["last_seen"] = datetime.utcfromtimestamp(row["last_seen"]).strftime(tformat) if isinstance(row["last_seen"], float) else None
+ found = found + 1
return templates.TemplateResponse("views/top.html", {
"request" : request,
"amount" : amount if response is not None else None,
"found" : found,
"blocklist": blocklist,
- "info" : info,
+ "slogan" : config.get("slogan"),
"theme" : config.get("theme"),
})
@router.get(config.get("base_url") + "/infos")
-def rss(request: Request, domain: str):
+def infos(request: Request, domain: str):
+ if domain is None:
+ raise HTTPException(status_code=400, detail="Invalid request, parameter 'domain' missing")
+
# Tidy up domain name
- domain = tidyup.domain(domain)
+ domain = tidyup.domain(domain).encode("idna").decode("utf-8")
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
- # Fetch domain data
- database.cursor.execute("SELECT * FROM instances WHERE domain = ? LIMIT 1", [domain])
- domain_data = database.cursor.fetchone()
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/domain.json?domain={domain}")
- if domain_data is None:
- raise HTTPException(status_code=404, detail=f"domain='{domain}' not found")
+ if not response.ok or response.status_code > 200 or response.text.strip() == "":
+ raise HTTPException(status_code=response.status_code, detail=response.reason)
+
+ domain_data = response.json()
# Format timestamps
- format = config.get("timestamp_format")
+ tformat = config.get("timestamp_format")
instance = dict()
for key in domain_data.keys():
- if key in ["last_nodeinfo", "last_blocked", "first_seen", "last_updated", "last_instance_fetch"]:
+ if key in ["last_nodeinfo", "last_blocked", "first_seen", "last_updated", "last_instance_fetch"] and isinstance(domain_data[key], float):
# Timestamps
- instance[key] = datetime.utcfromtimestamp(domain_data[key]).strftime(format) if isinstance(domain_data[key], float) else "-"
+ instance[key] = datetime.utcfromtimestamp(domain_data[key]).strftime(tformat)
else:
# Generic
instance[key] = domain_data[key]
@router.get(config.get("base_url") + "/rss")
def rss(request: Request, domain: str = None):
if domain is not None:
- domain = tidyup.domain(domain)
+ domain = tidyup.domain(domain).encode("idna").decode("utf-8")
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode("idna").decode("utf-8")
- database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT ?", [
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen \
+FROM blocks \
+WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? \
+ORDER BY first_seen DESC \
+LIMIT ?", [
domain,
"*." + domain, wildchar,
utils.get_hash(domain),
config.get("rss_limit")
])
else:
- database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT ?", [config.get("rss_limit")])
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen \
+FROM blocks \
+ORDER BY first_seen DESC \
+LIMIT ?", [config.get("rss_limit")])
result = database.cursor.fetchall()
blocklist = []
"request" : request,
"timestamp": format_datetime(datetime.now()),
"domain" : domain,
+ "scheme" : config.get("scheme"),
"hostname" : config.get("hostname"),
"blocks" : blocklist
}, headers={
"base_url": config.get("base_url")
})
+@router.get(config.get("base_url") + "/")
+def index(request: Request):
+ # Get info
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
+
+ if not response.ok:
+ raise HTTPException(status_code=response.status_code, detail=response.text)
+
+ return templates.TemplateResponse("views/index.html", {
+ "request": request,
+ "theme" : config.get("theme"),
+ "info" : response.json(),
+ "slogan" : config.get("slogan"),
+ })
+
if __name__ == "__main__":
- uvicorn.run("daemon:router", host=config.get("host"), port=config.get("port"), log_level=config.get("log_level"))
+ uvicorn.run(
+ "daemon:router",
+ host=config.get("host"),
+ port=config.get("port"),
+ log_level=config.get("log_level"),
+ proxy_headers=True
+ )