]> git.mxchange.org Git - fba.git/blob - api.py
Continued:
[fba.git] / api.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 from datetime import datetime
18 from email import utils
19
20 import re
21
22 from fastapi import Request, HTTPException, Query
23 from fastapi.responses import JSONResponse
24 from fastapi.responses import PlainTextResponse
25 from fastapi.templating import Jinja2Templates
26
27 import fastapi
28 import uvicorn
29 import requests
30 import validators
31
32 from fba import config
33 from fba import fba
34 from fba import network
35
36 router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
37 templates = Jinja2Templates(directory="templates")
38
39 @router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
40 def api_info():
41     fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
42     known, indexed, blocklist, errorous = fba.cursor.fetchone()
43
44     return {
45         "known_instances"   : known,
46         "indexed_instances" : indexed,
47         "blocks_recorded"   : blocklist,
48         "errorous_instances": errorous,
49         "slogan"            : config.get("slogan")
50     }
51
52 @router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
53 def api_top(mode: str, amount: int):
54     if amount > 500:
55         raise HTTPException(status_code=400, detail="Too many results")
56
57     if mode == "blocked":
58         fba.cursor.execute("SELECT blocked, COUNT(blocked) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY score DESC LIMIT ?", [amount])
59     elif mode == "blocker":
60         fba.cursor.execute("SELECT blocker, COUNT(blocker) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY score DESC LIMIT ?", [amount])
61     elif mode == "reference":
62         fba.cursor.execute("SELECT origin, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY score DESC LIMIT ?", [amount])
63     elif mode == "software":
64         fba.cursor.execute("SELECT software, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY score DESC, software ASC LIMIT ?", [amount])
65     elif mode == "command":
66         fba.cursor.execute("SELECT command, COUNT(domain) AS score FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY score DESC, command ASC LIMIT ?", [amount])
67     elif mode == "error_code":
68         fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS score FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY score DESC LIMIT ?", [amount])
69     else:
70         raise HTTPException(status_code=400, detail="No filter specified")
71
72     scores = list()
73
74     for domain, score in fba.cursor.fetchall():
75         scores.append({
76             "domain": domain,
77             "score" : score
78         })
79
80     return scores
81
82 @router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
83 def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
84     if domain is None and reason is None and reverse is None:
85         raise HTTPException(status_code=400, detail="No filter specified")
86
87     if reason is not None:
88         reason = re.sub("(%|_)", "", reason)
89         if len(reason) < 3:
90             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
91
92     if domain is not None:
93         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
94         punycode = domain.encode('idna').decode('utf-8')
95         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
96                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
97     elif reverse is not None:
98         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
99     else:
100         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
101
102     blocklist = fba.cursor.fetchall()
103
104     result = {}
105     for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
106         if reason is not None and reason != "":
107             reason = reason.replace(",", " ").replace("  ", " ")
108
109         entry = {
110             "blocker"   : blocker,
111             "blocked"   : blocked,
112             "reason"    : reason,
113             "first_seen": first_seen,
114             "last_seen" : last_seen
115         }
116
117         if block_level in result:
118             result[block_level].append(entry)
119         else:
120             result[block_level] = [entry]
121
122     return result
123
124 @router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
125 def api_mutual(domains: list[str] = Query()):
126     """Return 200 if federation is open between the two, 4xx otherwise"""
127     fba.cursor.execute(
128         "SELECT block_level FROM blocks " \
129         "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
130         "AND block_level = 'reject' " \
131         "LIMIT 1",
132         {
133             "a" : domains[0],
134             "b" : domains[1],
135             "aw": "*." + domains[0],
136             "bw": "*." + domains[1],
137         },
138     )
139     response = fba.cursor.fetchone()
140
141     if response is not None:
142         # Blocks found
143         return JSONResponse(status_code=418, content={})
144
145     # No known blocks
146     return JSONResponse(status_code=200, content={})
147
148 @router.get(config.get("base_url") + "/scoreboard")
149 def scoreboard(request: Request, mode: str, amount: int):
150     response = None
151
152     if mode == "blocker" and amount > 0:
153         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocker&amount={amount}")
154     elif mode == "blocked" and amount > 0:
155         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocked&amount={amount}")
156     elif mode == "reference" and amount > 0:
157         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=reference&amount={amount}")
158     elif mode == "software" and amount > 0:
159         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=software&amount={amount}")
160     elif mode == "command" and amount > 0:
161         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=command&amount={amount}")
162     elif mode == "error_code" and amount > 0:
163         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=error_code&amount={amount}")
164     else:
165         raise HTTPException(status_code=400, detail="No filter specified")
166
167     if response is None:
168         raise HTTPException(status_code=500, detail="Could not determine scores")
169     elif not response.ok:
170         raise HTTPException(status_code=response.status_code, detail=response.text)
171
172     return templates.TemplateResponse("views/scoreboard.html", {
173         "base_url"  : config.get("base_url"),
174         "slogan"    : config.get("slogan"),
175         "request"   : request,
176         "scoreboard": True,
177         "mode"      : mode,
178         "amount"    : amount,
179         "scores"    : network.json_from_response(response)
180     })
181
182 @router.get(config.get("base_url") + "/")
183 def index(request: Request):
184     # Get info
185     response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
186
187     if not response.ok:
188         raise HTTPException(status_code=response.status_code, detail=response.text)
189
190     return templates.TemplateResponse("views/index.html", {
191         "request": request,
192         "info"   : response.json()
193     })
194
195 @router.get(config.get("base_url") + "/top")
196 def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
197     if domain == "" or reason == "" or reverse == "":
198         raise HTTPException(status_code=500, detail="Insufficient parameter provided")
199
200     response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
201
202     if not response.ok:
203         raise HTTPException(status_code=response.status_code, detail=response.text)
204
205     info = response.json()
206     response = None
207
208     if domain is not None:
209         if not validators.domain(domain):
210             raise HTTPException(status_code=500, detail="Invalid domain")
211
212         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
213     elif reason is not None:
214         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
215     elif reverse is not None:
216         if not validators.domain(reverse):
217             raise HTTPException(status_code=500, detail="Invalid domain")
218
219         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
220
221     if response is not None:
222         if not response.ok:
223             raise HTTPException(status_code=response.status_code, detail=response.text)
224         blocklist = response.json()
225         for block_level in blocklist:
226             for block in blocklist[block_level]:
227                 block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
228                 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
229
230     return templates.TemplateResponse("views/top.html", {
231         "request": request,
232         "domain" : domain,
233         "blocks" : blocklist,
234         "reason" : reason,
235         "reverse": reverse,
236         "info"   : info
237     })
238
239 @router.get(config.get("base_url") + "/rss")
240 def rss(request: Request, domain: str = None):
241     if domain is not None:
242         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
243         punycode = domain.encode('idna').decode('utf-8')
244         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
245                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
246     else:
247         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT 50")
248
249     result = fba.cursor.fetchall()
250
251     blocklist = []
252     for blocker, blocked, block_level, reason, first_seen, last_seen in result:
253         first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
254         if reason is None or reason == '':
255             reason = "No reason provided."
256         else:
257             reason = "Provided reason: '" + reason + "'"
258
259         blocklist.append({
260             "blocker"    : blocker,
261             "blocked"    : blocked,
262             "block_level": block_level,
263             "reason"     : reason,
264             "first_seen" : first_seen,
265             "last_seen"  : last_seen,
266         })
267
268     return templates.TemplateResponse("rss.xml", {
269         "request"  : request,
270         "timestamp": utils.format_datetime(datetime.now()),
271         "domain"   : domain,
272         "hostname" : config.get("hostname"),
273         "blocks"   : blocklist
274     }, headers={
275         "Content-Type": "routerlication/rss+xml"
276     })
277
278 @router.get(config.get("base_url") + "/robots.txt", response_class=PlainTextResponse)
279 def robots(request: Request):
280     return templates.TemplateResponse("robots.txt", {
281         "request" : request,
282         "base_url": config.get("base_url")
283     })
284
285 if __name__ == "__main__":
286     uvicorn.run("api:router", host=config.get("host"), port=config.get("port"), log_level=config.get("log_level"))