]> git.mxchange.org Git - fba.git/blob - api.py
37b990d8851a4eeca124f2396cf95cf153d8514a
[fba.git] / api.py
1 from fastapi import FastAPI, Request, HTTPException, responses, Query
2 from fastapi.templating import Jinja2Templates
3 from datetime import datetime
4 from email import utils
5
6 import uvicorn
7 import requests
8 import sqlite3
9 import re
10 import fba
11
12 app = FastAPI(docs_url=fba.config["base_url"] + "/docs", redoc_url=fba.config["base_url"] + "/redoc")
13 templates = Jinja2Templates(directory=".")
14
15 @app.get(fba.config["base_url"] + "/info")
16 def info():
17     fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica')), (SELECT COUNT(blocker) FROM blocks)")
18     known, indexed, blocks = fba.cursor.fetchone()
19
20     return {
21         "known_instances"  : known,
22         "indexed_instances": indexed,
23         "blocks_recorded"  : blocks,
24         "slogan"           : fba.config["slogan"]
25     }
26
27 @app.get(fba.config["base_url"] + "/top")
28 def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None):
29     if blocked != None:
30         if blocked > 500:
31             raise HTTPException(status_code=400, detail="Too many results")
32         fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
33     elif blockers != None:
34         if blockers > 500:
35             raise HTTPException(status_code=400, detail="Too many results")
36         fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
37     elif reference != None:
38         if reference > 500:
39             raise HTTPException(status_code=400, detail="Too many results")
40         fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
41     elif software != None:
42         if software > 500:
43             raise HTTPException(status_code=400, detail="Too many results")
44         fba.cursor.execute("SELECT software, COUNT(domain) FROM instances GROUP BY software ORDER BY COUNT(domain) DESC LIMIT ?", [software])
45     else:
46         raise HTTPException(status_code=400, detail="No filter specified")
47
48     scores = fba.cursor.fetchall()
49
50     scoreboard = []
51
52     for domain, highscore in scores:
53         scoreboard.append({
54             "domain"   : domain,
55             "highscore": highscore
56         })
57
58     return scoreboard
59
60 @app.get(fba.config["base_url"] + "/api")
61 def blocked(domain: str = None, reason: str = None, reverse: str = None):
62     if domain == None and reason == None and reverse == None:
63         raise HTTPException(status_code=400, detail="No filter specified")
64     if reason != None:
65         reason = re.sub("(%|_)", "", reason)
66         if len(reason) < 3:
67             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
68     if domain != None:
69         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
70         punycode = domain.encode('idna').decode('utf-8')
71         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_added ASC",
72                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
73     elif reverse != None:
74         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks WHERE blocker = ? ORDER BY first_added ASC", [reverse])
75     else:
76         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_added ASC", ["%" + reason + "%"])
77     blocks = fba.cursor.fetchall()
78
79     result = {}
80     for blocker, blocked, block_level, reason, first_added, last_seen in blocks:
81         entry = {
82             "blocker"    : blocker,
83             "blocked"    : blocked,
84             "reason"     : reason,
85             "first_added": first_added,
86             "last_seen"  : last_seen
87         }
88         if block_level in result:
89             result[block_level].append(entry)
90         else:
91             result[block_level] = [entry]
92
93     return result
94
95 @app.get(fba.config["base_url"] + "/scoreboard")
96 def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None):
97     scores = None
98
99     if blockers != None and blockers > 0:
100         res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/top?blockers={blockers}")
101     elif blocked != None and blocked > 0:
102         res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/top?blocked={blocked}")
103     elif reference != None and reference > 0:
104         res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/top?reference={reference}")
105     elif software != None and software > 0:
106         res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/top?software={software}")
107     else:
108         raise HTTPException(status_code=400, detail="No filter specified")
109
110     if res == None:
111         raise HTTPException(status_code=500, detail="Could not determine scores")
112     elif not res.ok:
113         raise HTTPException(status_code=res.status_code, detail=res.text)
114
115     return templates.TemplateResponse("index.html", {
116         "base_url"  : fba.config["base_url"],
117         "request"   : request,
118         "scoreboard": True,
119         "blockers"  : blockers,
120         "blocked"   : blocked,
121         "reference" : reference,
122         "software"  : software,
123         "scores"    : res.json()
124     })
125
126 @app.get(fba.config["base_url"] + "/")
127 def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
128     if domain == "" or reason == "" or reverse == "":
129         return responses.RedirectResponse("/")
130
131     info = None
132     blocks = None
133
134     if domain == None and reason == None and reverse == None:
135         info = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/info")
136
137         if not info.ok:
138             raise HTTPException(status_code=info.status_code, detail=info.text)
139
140         info = info.json()
141     elif domain != None:
142         blocks = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api?domain={domain}")
143     elif reason != None:
144         blocks = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api?reason={reason}")
145     elif reverse != None:
146         blocks = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api?reverse={reverse}")
147
148     if blocks != None:
149         if not blocks.ok:
150             raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
151         blocks = blocks.json()
152         for block_level in blocks:
153             for block in blocks[block_level]:
154                 block["first_added"] = datetime.utcfromtimestamp(block["first_added"]).strftime('%Y-%m-%d %H:%M')
155                 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime('%Y-%m-%d %H:%M')
156
157     return templates.TemplateResponse("index.html", {
158         "request": request,
159         "domain" : domain,
160         "blocks" : blocks,
161         "reason" : reason,
162         "reverse": reverse,
163         "info"   : info
164     })
165
166 @app.get(fba.config["base_url"] + "/api/mutual")
167 def mutual(domains: list[str] = Query()):
168     """Return 200 if federation is open between the two, 4xx otherwise"""
169     fba.cursor.execute(
170         "SELECT block_level FROM blocks " \
171         "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
172         "AND block_level = 'reject' " \
173         "LIMIT 1",
174         {
175             "a": domains[0],
176             "b": domains[1],
177             "aw": "*." + domains[0],
178             "bw": "*." + domains[1],
179         },
180     )
181     res = fba.cursor.fetchone()
182
183     if res is not None:
184         # Blocks found
185         return responses.JSONResponse(status_code=418, content={})
186
187     # No known blocks
188     return responses.JSONResponse(status_code=200, content={})
189
190 @app.get(fba.config["base_url"] + "/rss")
191 def rss(request: Request, domain: str = None):
192     if domain != None:
193         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
194         punycode = domain.encode('idna').decode('utf-8')
195         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_added DESC LIMIT 50",
196                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
197     else:
198         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks ORDER BY first_added DESC LIMIT 50")
199
200     blocks = fba.cursor.fetchall()
201
202     result = []
203     for blocker, blocked, block_level, reason, first_added, last_seen in blocks:
204         first_added = utils.format_datetime(datetime.fromtimestamp(first_added))
205         if reason == None or reason == '':
206             reason = "No reason provided."
207         else:
208             reason = "Provided reason: '" + reason + "'"
209
210         result.append({
211             "blocker": blocker,
212             "blocked": blocked,
213             "block_level": block_level,
214             "reason": reason,
215             "first_added": first_added
216         })
217
218     timestamp = utils.format_datetime(datetime.now())
219
220     return templates.TemplateResponse("rss.xml", {
221         "request": request,
222         "timestamp": timestamp,
223         "domain": domain,
224         "blocks": result
225     }, headers={
226         "Content-Type": "application/rss+xml"
227     })
228
229 if __name__ == "__main__":
230     uvicorn.run("api:app", host="127.0.0.1", port=fba.config["port"], log_level=fba.config["log_level"])