]> git.mxchange.org Git - fba.git/blob - api.py
b37e555e072b7e3185a15200b254264c95419b23
[fba.git] / api.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 from fastapi import Request, HTTPException, Query
18 from fastapi.responses import JSONResponse
19 from fastapi.responses import PlainTextResponse
20 from fastapi.templating import Jinja2Templates
21 from datetime import datetime
22 from email import utils
23
24 import fastapi
25 import uvicorn
26 import requests
27 import re
28 import fba
29
30 router = fastapi.FastAPI(docs_url=fba.config["base_url"] + "/docs", redoc_url=fba.config["base_url"] + "/redoc")
31 templates = Jinja2Templates(directory="templates")
32
33 @router.get(fba.config["base_url"] + "/api/info.json", response_class=JSONResponse)
34 def info():
35     fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
36     known, indexed, blocks, errorous = fba.cursor.fetchone()
37
38     return {
39         "known_instances"   : known,
40         "indexed_instances" : indexed,
41         "blocks_recorded"   : blocks,
42         "errorous_instances": errorous,
43         "slogan"            : fba.config["slogan"]
44     }
45
46 @router.get(fba.config["base_url"] + "/api/top.json", response_class=JSONResponse)
47 def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None):
48     if blocked != None:
49         if blocked > 500:
50             raise HTTPException(status_code=400, detail="Too many results")
51         fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
52     elif blockers != None:
53         if blockers > 500:
54             raise HTTPException(status_code=400, detail="Too many results")
55         fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
56     elif reference != None:
57         if reference > 500:
58             raise HTTPException(status_code=400, detail="Too many results")
59         fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
60     elif software != None:
61         if software > 500:
62             raise HTTPException(status_code=400, detail="Too many results")
63         fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
64     else:
65         raise HTTPException(status_code=400, detail="No filter specified")
66
67     scores = fba.cursor.fetchall()
68
69     scoreboard = []
70
71     for domain, highscore in scores:
72         scoreboard.append({
73             "domain"   : domain,
74             "highscore": highscore
75         })
76
77     return scoreboard
78
79 @router.get(fba.config["base_url"] + "/api/index.json", response_class=JSONResponse)
80 def blocked(domain: str = None, reason: str = None, reverse: str = None):
81     if domain == None and reason == None and reverse == None:
82         raise HTTPException(status_code=400, detail="No filter specified")
83
84     if reason != None:
85         reason = re.sub("(%|_)", "", reason)
86         if len(reason) < 3:
87             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
88
89     if domain != None:
90         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
91         punycode = domain.encode('idna').decode('utf-8')
92         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
93                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
94     elif reverse != None:
95         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
96     else:
97         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
98
99     blocks = fba.cursor.fetchall()
100
101     result = {}
102     for blocker, blocked, block_level, reason, first_seen, last_seen in blocks:
103         entry = {
104             "blocker"   : blocker,
105             "blocked"   : blocked,
106             "reason"    : reason,
107             "first_seen": first_seen,
108             "last_seen" : last_seen
109         }
110         if block_level in result:
111             result[block_level].append(entry)
112         else:
113             result[block_level] = [entry]
114
115     return result
116
117 @router.get(fba.config["base_url"] + "/api/mutual.json", response_class=JSONResponse)
118 def mutual(domains: list[str] = Query()):
119     """Return 200 if federation is open between the two, 4xx otherwise"""
120     fba.cursor.execute(
121         "SELECT block_level FROM blocks " \
122         "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
123         "AND block_level = 'reject' " \
124         "LIMIT 1",
125         {
126             "a" : domains[0],
127             "b" : domains[1],
128             "aw": "*." + domains[0],
129             "bw": "*." + domains[1],
130         },
131     )
132     res = fba.cursor.fetchone()
133
134     if res is not None:
135         # Blocks found
136         return JSONResponse(status_code=418, content={})
137
138     # No known blocks
139     return JSONResponse(status_code=200, content={})
140
141 @router.get(fba.config["base_url"] + "/scoreboard")
142 def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None):
143     scores = None
144
145     if blockers != None and blockers > 0:
146         res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top.json?blockers={blockers}")
147     elif blocked != None and blocked > 0:
148         res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top.json?blocked={blocked}")
149     elif reference != None and reference > 0:
150         res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top.json?reference={reference}")
151     elif software != None and software > 0:
152         res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top.json?software={software}")
153     else:
154         raise HTTPException(status_code=400, detail="No filter specified")
155
156     if res == None:
157         raise HTTPException(status_code=500, detail="Could not determine scores")
158     elif not res.ok:
159         raise HTTPException(status_code=res.status_code, detail=res.text)
160
161     return templates.TemplateResponse("scoreboard.html", {
162         "base_url"  : fba.config["base_url"],
163         "slogan"    : fba.config["slogan"],
164         "request"   : request,
165         "scoreboard": True,
166         "blockers"  : blockers,
167         "blocked"   : blocked,
168         "reference" : reference,
169         "software"  : software,
170         "scores"    : res.json()
171     })
172
173 @router.get(fba.config["base_url"] + "/")
174 def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
175     if domain == "" or reason == "" or reverse == "":
176         return fastapi.responses.RedirectResponse("/")
177
178     info = None
179     blocks = None
180
181     if domain == None and reason == None and reverse == None:
182         info = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/info.json")
183
184         if not info.ok:
185             raise HTTPException(status_code=info.status_code, detail=info.text)
186
187         info = info.json()
188     elif domain != None:
189         blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/index.json?domain={domain}")
190     elif reason != None:
191         blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/index.json?reason={reason}")
192     elif reverse != None:
193         blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/index.json?reverse={reverse}")
194
195     if blocks != None:
196         if not blocks.ok:
197             raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
198         blocks = blocks.json()
199         for block_level in blocks:
200             for block in blocks[block_level]:
201                 block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime('%Y-%m-%d %H:%M')
202                 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime('%Y-%m-%d %H:%M')
203
204     return templates.TemplateResponse("index.html", {
205         "request": request,
206         "domain" : domain,
207         "blocks" : blocks,
208         "reason" : reason,
209         "reverse": reverse,
210         "info"   : info
211     })
212
213 @router.get(fba.config["base_url"] + "/rss")
214 def rss(request: Request, domain: str = None):
215     if domain != None:
216         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
217         punycode = domain.encode('idna').decode('utf-8')
218         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
219                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
220     else:
221         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT 50")
222
223     blocks = fba.cursor.fetchall()
224
225     result = []
226     for blocker, blocked, block_level, reason, first_seen, last_seen in blocks:
227         first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
228         if reason == None or reason == '':
229             reason = "No reason provided."
230         else:
231             reason = "Provided reason: '" + reason + "'"
232
233         result.append({
234             "blocker"    : blocker,
235             "blocked"    : blocked,
236             "block_level": block_level,
237             "reason"     : reason,
238             "first_seen" : first_seen
239         })
240
241     timestamp = utils.format_datetime(datetime.now())
242
243     return templates.TemplateResponse("rss.xml", {
244         "request"  : request,
245         "timestamp": timestamp,
246         "domain"   : domain,
247         "blocks"   : result
248     }, headers={
249         "Content-Type": "routerlication/rss+xml"
250     })
251
252 @router.get(fba.config["base_url"] + "/robots.txt", response_class=PlainTextResponse)
253 def robots(request: Request):
254     return templates.TemplateResponse("robots.txt", {
255         "request" : request,
256         "base_url": fba.config["base_url"]
257     })
258
259 if __name__ == "__main__":
260     uvicorn.run("api:router", host=fba.config["host"], port=fba.config["port"], log_level=fba.config["log_level"])