]> git.mxchange.org Git - fba.git/blob - api.py
Continued:
[fba.git] / api.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 from datetime import datetime
18 from email import utils
19
20 import re
21
22 from fastapi import Request, HTTPException, Query
23 from fastapi.responses import JSONResponse
24 from fastapi.responses import PlainTextResponse
25 from fastapi.templating import Jinja2Templates
26
27 import fastapi
28 import uvicorn
29 import requests
30 import validators
31
32 from fba import config
33 from fba import fba
34
35 router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
36 templates = Jinja2Templates(directory="templates")
37
38 @router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
39 def api_info():
40     fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
41     known, indexed, blocklist, errorous = fba.cursor.fetchone()
42
43     return {
44         "known_instances"   : known,
45         "indexed_instances" : indexed,
46         "blocks_recorded"   : blocklist,
47         "errorous_instances": errorous,
48         "slogan"            : config.get("slogan")
49     }
50
51 @router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
52 def api_top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
53     if blocked is not None:
54         if blocked > 500:
55             raise HTTPException(status_code=400, detail="Too many results")
56         fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
57     elif blockers is not None:
58         if blockers > 500:
59             raise HTTPException(status_code=400, detail="Too many results")
60         fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
61     elif reference is not None:
62         if reference > 500:
63             raise HTTPException(status_code=400, detail="Too many results")
64         fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
65     elif software is not None:
66         if software > 500:
67             raise HTTPException(status_code=400, detail="Too many results")
68         fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
69     elif command is not None:
70         if command > 500:
71             raise HTTPException(status_code=400, detail="Too many results")
72         fba.cursor.execute("SELECT command, COUNT(domain) FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY COUNT(domain) DESC, command ASC LIMIT ?", [command])
73     elif error_code is not None:
74         if error_code > 500:
75             raise HTTPException(status_code=400, detail="Too many results")
76         fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS cnt FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY cnt DESC LIMIT ?", [error_code])
77     else:
78         raise HTTPException(status_code=400, detail="No filter specified")
79
80     scores = fba.cursor.fetchall()
81
82     scores = []
83
84     for domain, highscore in scores:
85         scores.append({
86             "domain"   : domain,
87             "highscore": highscore
88         })
89
90     return scores
91
92 @router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
93 def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
94     if domain is None and reason is None and reverse is None:
95         raise HTTPException(status_code=400, detail="No filter specified")
96
97     if reason is not None:
98         reason = re.sub("(%|_)", "", reason)
99         if len(reason) < 3:
100             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
101
102     if domain is not None:
103         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
104         punycode = domain.encode('idna').decode('utf-8')
105         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
106                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
107     elif reverse is not None:
108         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
109     else:
110         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
111
112     blocklist = fba.cursor.fetchall()
113
114     result = {}
115     for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
116         if reason is not None and reason != "":
117             reason = reason.replace(",", " ").replace("  ", " ")
118
119         entry = {
120             "blocker"   : blocker,
121             "blocked"   : blocked,
122             "reason"    : reason,
123             "first_seen": first_seen,
124             "last_seen" : last_seen
125         }
126
127         if block_level in result:
128             result[block_level].append(entry)
129         else:
130             result[block_level] = [entry]
131
132     return result
133
134 @router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
135 def api_mutual(domains: list[str] = Query()):
136     """Return 200 if federation is open between the two, 4xx otherwise"""
137     fba.cursor.execute(
138         "SELECT block_level FROM blocks " \
139         "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
140         "AND block_level = 'reject' " \
141         "LIMIT 1",
142         {
143             "a" : domains[0],
144             "b" : domains[1],
145             "aw": "*." + domains[0],
146             "bw": "*." + domains[1],
147         },
148     )
149     response = fba.cursor.fetchone()
150
151     if response is not None:
152         # Blocks found
153         return JSONResponse(status_code=418, content={})
154
155     # No known blocks
156     return JSONResponse(status_code=200, content={})
157
158 @router.get(config.get("base_url") + "/scoreboard")
159 def scoreboard(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
160     if blockers is not None and blockers > 0:
161         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
162     elif blocked is not None and blocked > 0:
163         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
164     elif reference is not None and reference > 0:
165         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
166     elif software is not None and software > 0:
167         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
168     elif command is not None and command > 0:
169         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?command={command}")
170     elif error_code is not None and error_code > 0:
171         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?error_code={error_code}")
172     else:
173         raise HTTPException(status_code=400, detail="No filter specified")
174
175     if response is None:
176         raise HTTPException(status_code=500, detail="Could not determine scores")
177     elif not response.ok:
178         raise HTTPException(status_code=response.status_code, detail=response.text)
179
180     return templates.TemplateResponse("views/scoreboard.html", {
181         "base_url"  : config.get("base_url"),
182         "slogan"    : config.get("slogan"),
183         "request"   : request,
184         "scoreboard": True,
185         "blockers"  : blockers,
186         "blocked"   : blocked,
187         "reference" : reference,
188         "software"  : software,
189         "command"   : command,
190         "error_code": error_code,
191         "scores"    : fba.json_from_response(response)
192     })
193
194 @router.get(config.get("base_url") + "/")
195 def index(request: Request):
196     # Get info
197     response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
198
199     if not response.ok:
200         raise HTTPException(status_code=response.status_code, detail=response.text)
201
202     return templates.TemplateResponse("views/index.html", {
203         "request": request,
204         "info"   : response.json()
205     })
206
207 @router.get(config.get("base_url") + "/top")
208 def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
209     if domain == "" or reason == "" or reverse == "":
210         raise HTTPException(status_code=500, detail="Insufficient parameter provided")
211
212     response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
213
214     if not response.ok:
215         raise HTTPException(status_code=response.status_code, detail=response.text)
216
217     info = response.json()
218     response = None
219
220     if domain is not None:
221         if not validators.domain(domain):
222             raise HTTPException(status_code=500, detail="Invalid domain")
223
224         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
225     elif reason is not None:
226         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
227     elif reverse is not None:
228         if not validators.domain(reverse):
229             raise HTTPException(status_code=500, detail="Invalid domain")
230
231         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
232
233     if response is not None:
234         if not response.ok:
235             raise HTTPException(status_code=response.status_code, detail=response.text)
236         blocklist = response.json()
237         for block_level in blocklist:
238             for block in blocklist[block_level]:
239                 block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
240                 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
241
242     return templates.TemplateResponse("views/top.html", {
243         "request": request,
244         "domain" : domain,
245         "blocks" : blocklist,
246         "reason" : reason,
247         "reverse": reverse,
248         "info"   : info
249     })
250
251 @router.get(config.get("base_url") + "/rss")
252 def rss(request: Request, domain: str = None):
253     if domain is not None:
254         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
255         punycode = domain.encode('idna').decode('utf-8')
256         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
257                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
258     else:
259         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT 50")
260
261     result = fba.cursor.fetchall()
262
263     blocklist = []
264     for blocker, blocked, block_level, reason, first_seen, last_seen in result:
265         first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
266         if reason is None or reason == '':
267             reason = "No reason provided."
268         else:
269             reason = "Provided reason: '" + reason + "'"
270
271         blocklist.append({
272             "blocker"    : blocker,
273             "blocked"    : blocked,
274             "block_level": block_level,
275             "reason"     : reason,
276             "first_seen" : first_seen,
277             "last_seen"  : last_seen,
278         })
279
280     return templates.TemplateResponse("rss.xml", {
281         "request"  : request,
282         "timestamp": utils.format_datetime(datetime.now()),
283         "domain"   : domain,
284         "hostname" : config.get("hostname"),
285         "blocks"   : blocklist
286     }, headers={
287         "Content-Type": "routerlication/rss+xml"
288     })
289
290 @router.get(config.get("base_url") + "/robots.txt", response_class=PlainTextResponse)
291 def robots(request: Request):
292     return templates.TemplateResponse("robots.txt", {
293         "request" : request,
294         "base_url": config.get("base_url")
295     })
296
297 if __name__ == "__main__":
298     uvicorn.run("api:router", host=config.get("host"), port=config.get("port"), log_level=config.get("log_level"))