1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 from fastapi import Request, HTTPException, responses, Query
18 from fastapi.responses import PlainTextResponse
19 from fastapi.templating import Jinja2Templates
20 from datetime import datetime
21 from email import utils
29 router = fastapi.FastAPI(docs_url=fba.config["base_url"] + "/docs", redoc_url=fba.config["base_url"] + "/redoc")
30 templates = Jinja2Templates(directory="templates")
32 @router.get(fba.config["base_url"] + "/api/info")
34 fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
35 known, indexed, blocks, errorous = fba.cursor.fetchone()
38 "known_instances" : known,
39 "indexed_instances" : indexed,
40 "blocks_recorded" : blocks,
41 "errorous_instances": errorous,
42 "slogan" : fba.config["slogan"]
45 @router.get(fba.config["base_url"] + "/api/top")
46 def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None):
49 raise HTTPException(status_code=400, detail="Too many results")
50 fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
51 elif blockers != None:
53 raise HTTPException(status_code=400, detail="Too many results")
54 fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
55 elif reference != None:
57 raise HTTPException(status_code=400, detail="Too many results")
58 fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
59 elif software != None:
61 raise HTTPException(status_code=400, detail="Too many results")
62 fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
64 raise HTTPException(status_code=400, detail="No filter specified")
66 scores = fba.cursor.fetchall()
70 for domain, highscore in scores:
73 "highscore": highscore
78 @router.get(fba.config["base_url"] + "/api")
79 def blocked(domain: str = None, reason: str = None, reverse: str = None):
80 if domain == None and reason == None and reverse == None:
81 raise HTTPException(status_code=400, detail="No filter specified")
84 reason = re.sub("(%|_)", "", reason)
86 raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
89 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
90 punycode = domain.encode('idna').decode('utf-8')
91 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
92 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
94 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
96 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
98 blocks = fba.cursor.fetchall()
101 for blocker, blocked, block_level, reason, first_seen, last_seen in blocks:
106 "first_seen": first_seen,
107 "last_seen" : last_seen
109 if block_level in result:
110 result[block_level].append(entry)
112 result[block_level] = [entry]
116 @router.get(fba.config["base_url"] + "/scoreboard")
117 def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None):
120 if blockers != None and blockers > 0:
121 res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?blockers={blockers}")
122 elif blocked != None and blocked > 0:
123 res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?blocked={blocked}")
124 elif reference != None and reference > 0:
125 res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?reference={reference}")
126 elif software != None and software > 0:
127 res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?software={software}")
129 raise HTTPException(status_code=400, detail="No filter specified")
132 raise HTTPException(status_code=500, detail="Could not determine scores")
134 raise HTTPException(status_code=res.status_code, detail=res.text)
136 return templates.TemplateResponse("scoreboard.html", {
137 "base_url" : fba.config["base_url"],
138 "slogan" : fba.config["slogan"],
141 "blockers" : blockers,
143 "reference" : reference,
144 "software" : software,
145 "scores" : res.json()
148 @router.get(fba.config["base_url"] + "/")
149 def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
150 if domain == "" or reason == "" or reverse == "":
151 return responses.RedirectResponse("/")
156 if domain == None and reason == None and reverse == None:
157 info = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/info")
160 raise HTTPException(status_code=info.status_code, detail=info.text)
164 blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api?domain={domain}")
166 blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api?reason={reason}")
167 elif reverse != None:
168 blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api?reverse={reverse}")
172 raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
173 blocks = blocks.json()
174 for block_level in blocks:
175 for block in blocks[block_level]:
176 block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime('%Y-%m-%d %H:%M')
177 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime('%Y-%m-%d %H:%M')
179 return templates.TemplateResponse("index.html", {
188 @router.get(fba.config["base_url"] + "/api/mutual")
189 def mutual(domains: list[str] = Query()):
190 """Return 200 if federation is open between the two, 4xx otherwise"""
192 "SELECT block_level FROM blocks " \
193 "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
194 "AND block_level = 'reject' " \
199 "aw": "*." + domains[0],
200 "bw": "*." + domains[1],
203 res = fba.cursor.fetchone()
207 return responses.JSONResponse(status_code=418, content={})
210 return responses.JSONResponse(status_code=200, content={})
212 @router.get(fba.config["base_url"] + "/rss")
213 def rss(request: Request, domain: str = None):
215 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
216 punycode = domain.encode('idna').decode('utf-8')
217 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
218 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
220 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT 50")
222 blocks = fba.cursor.fetchall()
225 for blocker, blocked, block_level, reason, first_seen, last_seen in blocks:
226 first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
227 if reason == None or reason == '':
228 reason = "No reason provided."
230 reason = "Provided reason: '" + reason + "'"
235 "block_level": block_level,
237 "first_seen" : first_seen
240 timestamp = utils.format_datetime(datetime.now())
242 return templates.TemplateResponse("rss.xml", {
244 "timestamp": timestamp,
248 "Content-Type": "routerlication/rss+xml"
251 @router.get(fba.config["base_url"] + "/robots.txt", response_class=PlainTextResponse)
252 def robots(request: Request):
253 return templates.TemplateResponse("robots.txt", {
255 "base_url": fba.config["base_url"]
257 "Content-Type": "text/plain"
260 if __name__ == "__main__":
261 uvicorn.run("api:router", host=fba.config["host"], port=fba.config["port"], log_level=fba.config["log_level"])