1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 from fastapi import Request, HTTPException, responses, Query
18 from fastapi.templating import Jinja2Templates
19 from datetime import datetime
20 from email import utils
28 app = fastapi.FastAPI(docs_url=fba.config["base_url"] + "/docs", redoc_url=fba.config["base_url"] + "/redoc")
29 templates = Jinja2Templates(directory=".")
31 @app.get(fba.config["base_url"] + "/api/info")
33 fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
34 known, indexed, blocks, errorous = fba.cursor.fetchone()
37 "known_instances" : known,
38 "indexed_instances" : indexed,
39 "blocks_recorded" : blocks,
40 "errorous_instances": errorous,
41 "slogan" : fba.config["slogan"]
44 @app.get(fba.config["base_url"] + "/api/top")
45 def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None):
48 raise HTTPException(status_code=400, detail="Too many results")
49 fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
50 elif blockers != None:
52 raise HTTPException(status_code=400, detail="Too many results")
53 fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
54 elif reference != None:
56 raise HTTPException(status_code=400, detail="Too many results")
57 fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
58 elif software != None:
60 raise HTTPException(status_code=400, detail="Too many results")
61 fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
63 raise HTTPException(status_code=400, detail="No filter specified")
65 scores = fba.cursor.fetchall()
69 for domain, highscore in scores:
72 "highscore": highscore
77 @app.get(fba.config["base_url"] + "/api")
78 def blocked(domain: str = None, reason: str = None, reverse: str = None):
79 if domain == None and reason == None and reverse == None:
80 raise HTTPException(status_code=400, detail="No filter specified")
83 reason = re.sub("(%|_)", "", reason)
85 raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
88 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
89 punycode = domain.encode('idna').decode('utf-8')
90 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
91 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
93 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
95 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
97 blocks = fba.cursor.fetchall()
100 for blocker, blocked, block_level, reason, first_seen, last_seen in blocks:
105 "first_seen": first_seen,
106 "last_seen" : last_seen
108 if block_level in result:
109 result[block_level].append(entry)
111 result[block_level] = [entry]
115 @app.get(fba.config["base_url"] + "/scoreboard")
116 def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None):
119 if blockers != None and blockers > 0:
120 res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?blockers={blockers}")
121 elif blocked != None and blocked > 0:
122 res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?blocked={blocked}")
123 elif reference != None and reference > 0:
124 res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?reference={reference}")
125 elif software != None and software > 0:
126 res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?software={software}")
128 raise HTTPException(status_code=400, detail="No filter specified")
131 raise HTTPException(status_code=500, detail="Could not determine scores")
133 raise HTTPException(status_code=res.status_code, detail=res.text)
135 return templates.TemplateResponse("index.html", {
136 "base_url" : fba.config["base_url"],
139 "blockers" : blockers,
141 "reference" : reference,
142 "software" : software,
143 "scores" : res.json()
146 @app.get(fba.config["base_url"] + "/")
147 def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
148 if domain == "" or reason == "" or reverse == "":
149 return responses.RedirectResponse("/")
154 if domain == None and reason == None and reverse == None:
155 info = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/info")
158 raise HTTPException(status_code=info.status_code, detail=info.text)
162 blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api?domain={domain}")
164 blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api?reason={reason}")
165 elif reverse != None:
166 blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api?reverse={reverse}")
170 raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
171 blocks = blocks.json()
172 for block_level in blocks:
173 for block in blocks[block_level]:
174 block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime('%Y-%m-%d %H:%M')
175 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime('%Y-%m-%d %H:%M')
177 return templates.TemplateResponse("index.html", {
186 @app.get(fba.config["base_url"] + "/api/mutual")
187 def mutual(domains: list[str] = Query()):
188 """Return 200 if federation is open between the two, 4xx otherwise"""
190 "SELECT block_level FROM blocks " \
191 "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
192 "AND block_level = 'reject' " \
197 "aw": "*." + domains[0],
198 "bw": "*." + domains[1],
201 res = fba.cursor.fetchone()
205 return responses.JSONResponse(status_code=418, content={})
208 return responses.JSONResponse(status_code=200, content={})
210 @app.get(fba.config["base_url"] + "/rss")
211 def rss(request: Request, domain: str = None):
213 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
214 punycode = domain.encode('idna').decode('utf-8')
215 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
216 (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
218 fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT 50")
220 blocks = fba.cursor.fetchall()
223 for blocker, blocked, block_level, reason, first_seen, last_seen in blocks:
224 first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
225 if reason == None or reason == '':
226 reason = "No reason provided."
228 reason = "Provided reason: '" + reason + "'"
233 "block_level": block_level,
235 "first_seen" : first_seen
238 timestamp = utils.format_datetime(datetime.now())
240 return templates.TemplateResponse("rss.xml", {
242 "timestamp": timestamp,
246 "Content-Type": "application/rss+xml"
249 if __name__ == "__main__":
250 uvicorn.run("api:app", host=fba.config["host"], port=fba.config["port"], log_level=fba.config["log_level"])