1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 from datetime import datetime
18 from email import utils
22 from fastapi import Request, HTTPException, Query
23 from fastapi.responses import JSONResponse
24 from fastapi.responses import PlainTextResponse
25 from fastapi.templating import Jinja2Templates
32 from fba import config
35 router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
36 templates = Jinja2Templates(directory="templates")
38 @router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
40 fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
41 known, indexed, blocklist, errorous = fba.cursor.fetchone()
44 "known_instances" : known,
45 "indexed_instances" : indexed,
46 "blocks_recorded" : blocklist,
47 "errorous_instances": errorous,
48 "slogan" : config.get("slogan")
51 @router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
52 def api_top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
53 if blocked is not None:
55 raise HTTPException(status_code=400, detail="Too many results")
56 fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
57 elif blockers is not None:
59 raise HTTPException(status_code=400, detail="Too many results")
60 fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
61 elif reference is not None:
63 raise HTTPException(status_code=400, detail="Too many results")
64 fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
65 elif software is not None:
67 raise HTTPException(status_code=400, detail="Too many results")
68 fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
69 elif command is not None:
71 raise HTTPException(status_code=400, detail="Too many results")
72 fba.cursor.execute("SELECT command, COUNT(domain) FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY COUNT(domain) DESC, command ASC LIMIT ?", [command])
73 elif error_code is not None:
75 raise HTTPException(status_code=400, detail="Too many results")
76 fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS cnt FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY cnt DESC LIMIT ?", [error_code])
78 raise HTTPException(status_code=400, detail="No filter specified")
80 scores = fba.cursor.fetchall()
84 for domain, highscore in scores:
87 "highscore": highscore
92 @router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
93 def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
94 if domain is None and reason is None and reverse is None:
95 raise HTTPException(status_code=400, detail="No filter specified")
97 if reason is not None:
98 reason = re.sub("(%|_)", "", reason)
100 raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
102 if domain is not None:
103 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
104 punycode = domain.encode('idna').decode('utf-8')
105 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
106 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
107 elif reverse is not None:
108 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
110 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
112 blocklist = fba.cursor.fetchall()
115 for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
116 if reason is not None and reason != "":
117 reason = reason.replace(",", " ").replace(" ", " ")
123 "first_seen": first_seen,
124 "last_seen" : last_seen
127 if block_level in result:
128 result[block_level].append(entry)
130 result[block_level] = [entry]
134 @router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
135 def api_mutual(domains: list[str] = Query()):
136 """Return 200 if federation is open between the two, 4xx otherwise"""
138 "SELECT block_level FROM blocks " \
139 "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
140 "AND block_level = 'reject' " \
145 "aw": "*." + domains[0],
146 "bw": "*." + domains[1],
149 response = fba.cursor.fetchone()
151 if response is not None:
153 return JSONResponse(status_code=418, content={})
156 return JSONResponse(status_code=200, content={})
158 @router.get(config.get("base_url") + "/scoreboard")
159 def scoreboard(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
160 if blockers is not None and blockers > 0:
161 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
162 elif blocked is not None and blocked > 0:
163 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
164 elif reference is not None and reference > 0:
165 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
166 elif software is not None and software > 0:
167 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
168 elif command is not None and command > 0:
169 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?command={command}")
170 elif error_code is not None and error_code > 0:
171 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?error_code={error_code}")
173 raise HTTPException(status_code=400, detail="No filter specified")
176 raise HTTPException(status_code=500, detail="Could not determine scores")
177 elif not response.ok:
178 raise HTTPException(status_code=response.status_code, detail=response.text)
180 return templates.TemplateResponse("views/scoreboard.html", {
181 "base_url" : config.get("base_url"),
182 "slogan" : config.get("slogan"),
185 "blockers" : blockers,
187 "reference" : reference,
188 "software" : software,
190 "error_code": error_code,
191 "scores" : fba.json_from_response(response)
194 @router.get(config.get("base_url") + "/")
195 def index(request: Request):
197 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
200 raise HTTPException(status_code=response.status_code, detail=response.text)
202 return templates.TemplateResponse("views/index.html", {
204 "info" : response.json()
207 @router.get(config.get("base_url") + "/top")
208 def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
209 if domain == "" or reason == "" or reverse == "":
210 raise HTTPException(status_code=500, detail="Insufficient parameter provided")
212 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
215 raise HTTPException(status_code=response.status_code, detail=response.text)
217 info = response.json()
220 if domain is not None:
221 if not validators.domain(domain):
222 raise HTTPException(status_code=500, detail="Invalid domain")
224 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
225 elif reason is not None:
226 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
227 elif reverse is not None:
228 if not validators.domain(reverse):
229 raise HTTPException(status_code=500, detail="Invalid domain")
231 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
233 if response is not None:
235 raise HTTPException(status_code=response.status_code, detail=response.text)
236 blocklist = response.json()
237 for block_level in blocklist:
238 for block in blocklist[block_level]:
239 block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
240 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
242 return templates.TemplateResponse("views/top.html", {
245 "blocks" : blocklist,
251 @router.get(config.get("base_url") + "/rss")
252 def rss(request: Request, domain: str = None):
253 if domain is not None:
254 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
255 punycode = domain.encode('idna').decode('utf-8')
256 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
257 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
259 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT 50")
261 result = fba.cursor.fetchall()
264 for blocker, blocked, block_level, reason, first_seen, last_seen in result:
265 first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
266 if reason is None or reason == '':
267 reason = "No reason provided."
269 reason = "Provided reason: '" + reason + "'"
274 "block_level": block_level,
276 "first_seen" : first_seen,
277 "last_seen" : last_seen,
280 return templates.TemplateResponse("rss.xml", {
282 "timestamp": utils.format_datetime(datetime.now()),
284 "hostname" : config.get("hostname"),
287 "Content-Type": "routerlication/rss+xml"
290 @router.get(config.get("base_url") + "/robots.txt", response_class=PlainTextResponse)
291 def robots(request: Request):
292 return templates.TemplateResponse("robots.txt", {
294 "base_url": config.get("base_url")
297 if __name__ == "__main__":
298 uvicorn.run("api:router", host=config.get("host"), port=config.get("port"), log_level=config.get("log_level"))