1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 from datetime import datetime
18 from email import utils
22 from fastapi import Request, HTTPException, Query
23 from fastapi.responses import JSONResponse
24 from fastapi.responses import PlainTextResponse
25 from fastapi.templating import Jinja2Templates
32 from fba import config
34 from fba import network
36 router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
37 templates = Jinja2Templates(directory="templates")
39 @router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
41 fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
42 known, indexed, blocklist, errorous = fba.cursor.fetchone()
45 "known_instances" : known,
46 "indexed_instances" : indexed,
47 "blocks_recorded" : blocklist,
48 "errorous_instances": errorous,
49 "slogan" : config.get("slogan")
52 @router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
53 def api_top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
54 if blocked is not None:
56 raise HTTPException(status_code=400, detail="Too many results")
57 fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
58 elif blockers is not None:
60 raise HTTPException(status_code=400, detail="Too many results")
61 fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
62 elif reference is not None:
64 raise HTTPException(status_code=400, detail="Too many results")
65 fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
66 elif software is not None:
68 raise HTTPException(status_code=400, detail="Too many results")
69 fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
70 elif command is not None:
72 raise HTTPException(status_code=400, detail="Too many results")
73 fba.cursor.execute("SELECT command, COUNT(domain) FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY COUNT(domain) DESC, command ASC LIMIT ?", [command])
74 elif error_code is not None:
76 raise HTTPException(status_code=400, detail="Too many results")
77 fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS cnt FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY cnt DESC LIMIT ?", [error_code])
79 raise HTTPException(status_code=400, detail="No filter specified")
81 scores = fba.cursor.fetchall()
85 for domain, highscore in scores:
88 "highscore": highscore
93 @router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
94 def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
95 if domain is None and reason is None and reverse is None:
96 raise HTTPException(status_code=400, detail="No filter specified")
98 if reason is not None:
99 reason = re.sub("(%|_)", "", reason)
101 raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
103 if domain is not None:
104 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
105 punycode = domain.encode('idna').decode('utf-8')
106 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
107 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
108 elif reverse is not None:
109 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
111 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
113 blocklist = fba.cursor.fetchall()
116 for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
117 if reason is not None and reason != "":
118 reason = reason.replace(",", " ").replace(" ", " ")
124 "first_seen": first_seen,
125 "last_seen" : last_seen
128 if block_level in result:
129 result[block_level].append(entry)
131 result[block_level] = [entry]
135 @router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
136 def api_mutual(domains: list[str] = Query()):
137 """Return 200 if federation is open between the two, 4xx otherwise"""
139 "SELECT block_level FROM blocks " \
140 "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
141 "AND block_level = 'reject' " \
146 "aw": "*." + domains[0],
147 "bw": "*." + domains[1],
150 response = fba.cursor.fetchone()
152 if response is not None:
154 return JSONResponse(status_code=418, content={})
157 return JSONResponse(status_code=200, content={})
159 @router.get(config.get("base_url") + "/scoreboard")
160 def scoreboard(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
163 if blockers is not None and blockers > 0:
164 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
165 elif blocked is not None and blocked > 0:
166 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
167 elif reference is not None and reference > 0:
168 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
169 elif software is not None and software > 0:
170 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
171 elif command is not None and command > 0:
172 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?command={command}")
173 elif error_code is not None and error_code > 0:
174 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?error_code={error_code}")
176 raise HTTPException(status_code=400, detail="No filter specified")
179 raise HTTPException(status_code=500, detail="Could not determine scores")
180 elif not response.ok:
181 raise HTTPException(status_code=response.status_code, detail=response.text)
183 return templates.TemplateResponse("views/scoreboard.html", {
184 "base_url" : config.get("base_url"),
185 "slogan" : config.get("slogan"),
188 "blockers" : blockers,
190 "reference" : reference,
191 "software" : software,
193 "error_code": error_code,
194 "scores" : network.json_from_response(response)
197 @router.get(config.get("base_url") + "/")
198 def index(request: Request):
200 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
203 raise HTTPException(status_code=response.status_code, detail=response.text)
205 return templates.TemplateResponse("views/index.html", {
207 "info" : response.json()
210 @router.get(config.get("base_url") + "/top")
211 def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
212 if domain == "" or reason == "" or reverse == "":
213 raise HTTPException(status_code=500, detail="Insufficient parameter provided")
215 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
218 raise HTTPException(status_code=response.status_code, detail=response.text)
220 info = response.json()
223 if domain is not None:
224 if not validators.domain(domain):
225 raise HTTPException(status_code=500, detail="Invalid domain")
227 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
228 elif reason is not None:
229 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
230 elif reverse is not None:
231 if not validators.domain(reverse):
232 raise HTTPException(status_code=500, detail="Invalid domain")
234 response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
236 if response is not None:
238 raise HTTPException(status_code=response.status_code, detail=response.text)
239 blocklist = response.json()
240 for block_level in blocklist:
241 for block in blocklist[block_level]:
242 block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
243 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
245 return templates.TemplateResponse("views/top.html", {
248 "blocks" : blocklist,
254 @router.get(config.get("base_url") + "/rss")
255 def rss(request: Request, domain: str = None):
256 if domain is not None:
257 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
258 punycode = domain.encode('idna').decode('utf-8')
259 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
260 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
262 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT 50")
264 result = fba.cursor.fetchall()
267 for blocker, blocked, block_level, reason, first_seen, last_seen in result:
268 first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
269 if reason is None or reason == '':
270 reason = "No reason provided."
272 reason = "Provided reason: '" + reason + "'"
277 "block_level": block_level,
279 "first_seen" : first_seen,
280 "last_seen" : last_seen,
283 return templates.TemplateResponse("rss.xml", {
285 "timestamp": utils.format_datetime(datetime.now()),
287 "hostname" : config.get("hostname"),
290 "Content-Type": "routerlication/rss+xml"
293 @router.get(config.get("base_url") + "/robots.txt", response_class=PlainTextResponse)
294 def robots(request: Request):
295 return templates.TemplateResponse("robots.txt", {
297 "base_url": config.get("base_url")
300 if __name__ == "__main__":
301 uvicorn.run("api:router", host=config.get("host"), port=config.get("port"), log_level=config.get("log_level"))