1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 from datetime import datetime
18 from email import utils
22 from fastapi import Request, HTTPException, Query
23 from fastapi.responses import JSONResponse
24 from fastapi.responses import PlainTextResponse
25 from fastapi.templating import Jinja2Templates
32 from fba import config
34 from fba import network
36 router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
37 templates = Jinja2Templates(directory="templates")
39 @router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
41 fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
42 known, indexed, blocklist, errorous = fba.cursor.fetchone()
45 "known_instances" : known,
46 "indexed_instances" : indexed,
47 "blocks_recorded" : blocklist,
48 "errorous_instances": errorous,
49 "slogan" : config.get("slogan")
52 @router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
53 def api_top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
54 if blocked is not None:
56 raise HTTPException(status_code=400, detail="Too many results")
57 fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
58 elif blockers is not None:
60 raise HTTPException(status_code=400, detail="Too many results")
61 fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
62 elif reference is not None:
64 raise HTTPException(status_code=400, detail="Too many results")
65 fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
66 elif software is not None:
68 raise HTTPException(status_code=400, detail="Too many results")
69 fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
70 elif command is not None:
72 raise HTTPException(status_code=400, detail="Too many results")
73 fba.cursor.execute("SELECT command, COUNT(domain) FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY COUNT(domain) DESC, command ASC LIMIT ?", [command])
74 elif error_code is not None:
76 raise HTTPException(status_code=400, detail="Too many results")
77 fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS cnt FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY cnt DESC LIMIT ?", [error_code])
79 raise HTTPException(status_code=400, detail="No filter specified")
83 for domain, highscore in fba.cursor.fetchall():
86 "highscore": highscore
91 @router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
92 def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
93 if domain is None and reason is None and reverse is None:
94 raise HTTPException(status_code=400, detail="No filter specified")
96 if reason is not None:
97 reason = re.sub("(%|_)", "", reason)
99 raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
101 if domain is not None:
102 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
103 punycode = domain.encode('idna').decode('utf-8')
104 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
105 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
106 elif reverse is not None:
107 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
109 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
111 blocklist = fba.cursor.fetchall()
114 for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
115 if reason is not None and reason != "":
116 reason = reason.replace(",", " ").replace(" ", " ")
122 "first_seen": first_seen,
123 "last_seen" : last_seen
126 if block_level in result:
127 result[block_level].append(entry)
129 result[block_level] = [entry]
133 @router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
134 def api_mutual(domains: list[str] = Query()):
135 """Return 200 if federation is open between the two, 4xx otherwise"""
137 "SELECT block_level FROM blocks " \
138 "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
139 "AND block_level = 'reject' " \
144 "aw": "*." + domains[0],
145 "bw": "*." + domains[1],
148 response = fba.cursor.fetchone()
150 if response is not None:
152 return JSONResponse(status_code=418, content={})
155 return JSONResponse(status_code=200, content={})
157 @router.get(config.get("base_url") + "/scoreboard")
158 def scoreboard(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
161 if blockers is not None and blockers > 0:
162 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
163 elif blocked is not None and blocked > 0:
164 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
165 elif reference is not None and reference > 0:
166 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
167 elif software is not None and software > 0:
168 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
169 elif command is not None and command > 0:
170 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?command={command}")
171 elif error_code is not None and error_code > 0:
172 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?error_code={error_code}")
174 raise HTTPException(status_code=400, detail="No filter specified")
177 raise HTTPException(status_code=500, detail="Could not determine scores")
178 elif not response.ok:
179 raise HTTPException(status_code=response.status_code, detail=response.text)
181 return templates.TemplateResponse("views/scoreboard.html", {
182 "base_url" : config.get("base_url"),
183 "slogan" : config.get("slogan"),
186 "blockers" : blockers,
188 "reference" : reference,
189 "software" : software,
191 "error_code": error_code,
192 "scores" : network.json_from_response(response)
195 @router.get(config.get("base_url") + "/")
196 def index(request: Request):
198 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
201 raise HTTPException(status_code=response.status_code, detail=response.text)
203 return templates.TemplateResponse("views/index.html", {
205 "info" : response.json()
208 @router.get(config.get("base_url") + "/top")
209 def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
210 if domain == "" or reason == "" or reverse == "":
211 raise HTTPException(status_code=500, detail="Insufficient parameter provided")
213 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
216 raise HTTPException(status_code=response.status_code, detail=response.text)
218 info = response.json()
221 if domain is not None:
222 if not validators.domain(domain):
223 raise HTTPException(status_code=500, detail="Invalid domain")
225 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
226 elif reason is not None:
227 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
228 elif reverse is not None:
229 if not validators.domain(reverse):
230 raise HTTPException(status_code=500, detail="Invalid domain")
232 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
234 if response is not None:
236 raise HTTPException(status_code=response.status_code, detail=response.text)
237 blocklist = response.json()
238 for block_level in blocklist:
239 for block in blocklist[block_level]:
240 block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
241 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
243 return templates.TemplateResponse("views/top.html", {
246 "blocks" : blocklist,
252 @router.get(config.get("base_url") + "/rss")
253 def rss(request: Request, domain: str = None):
254 if domain is not None:
255 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
256 punycode = domain.encode('idna').decode('utf-8')
257 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
258 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
260 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT 50")
262 result = fba.cursor.fetchall()
265 for blocker, blocked, block_level, reason, first_seen, last_seen in result:
266 first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
267 if reason is None or reason == '':
268 reason = "No reason provided."
270 reason = "Provided reason: '" + reason + "'"
275 "block_level": block_level,
277 "first_seen" : first_seen,
278 "last_seen" : last_seen,
281 return templates.TemplateResponse("rss.xml", {
283 "timestamp": utils.format_datetime(datetime.now()),
285 "hostname" : config.get("hostname"),
288 "Content-Type": "routerlication/rss+xml"
291 @router.get(config.get("base_url") + "/robots.txt", response_class=PlainTextResponse)
292 def robots(request: Request):
293 return templates.TemplateResponse("robots.txt", {
295 "base_url": config.get("base_url")
298 if __name__ == "__main__":
299 uvicorn.run("api:router", host=config.get("host"), port=config.get("port"), log_level=config.get("log_level"))