]> git.mxchange.org Git - fba.git/blob - api.py
Continued:
[fba.git] / api.py
1 from fastapi import FastAPI, Request, HTTPException, responses, Query
2 from fastapi.templating import Jinja2Templates
3 from datetime import datetime
4 from email import utils
5
6 import uvicorn
7 import requests
8 import sqlite3
9 import re
10 import fba
11
12 app = FastAPI(docs_url=fba.config["base_url"] + "/docs", redoc_url=fba.config["base_url"] + "/redoc")
13 templates = Jinja2Templates(directory=".")
14
15 @app.get(fba.config["base_url"] + "/info")
16 def info():
17     fba.c.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica')), (SELECT COUNT(blocker) FROM blocks)")
18     known, indexed, blocks = fba.c.fetchone()
19
20     return {
21         "known_instances"  : known,
22         "indexed_instances": indexed,
23         "blocks_recorded"  : blocks,
24         "slogan"           : fba.config["slogan"]
25     }
26
27 @app.get(fba.config["base_url"] + "/top")
28 def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None):
29     if blocked != None:
30         if blocked > 500:
31             raise HTTPException(status_code=400, detail="Too many results")
32         fba.c.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
33     elif blockers != None:
34         if blockers > 500:
35             raise HTTPException(status_code=400, detail="Too many results")
36         fba.c.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
37     elif reference != None:
38         if reference > 500:
39             raise HTTPException(status_code=400, detail="Too many results")
40         fba.c.execute("SELECT origin, COUNT(domain) FROM instances GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
41     elif software != None:
42         if software > 500:
43             raise HTTPException(status_code=400, detail="Too many results")
44         fba.c.execute("SELECT software, COUNT(domain) FROM instances GROUP BY software ORDER BY COUNT(domain) DESC LIMIT ?", [software])
45     else:
46         raise HTTPException(status_code=400, detail="No filter specified")
47
48     scores = fba.c.fetchall()
49
50     scoreboard = []
51
52     for domain, highscore in scores:
53         scoreboard.append({"domain": domain, "highscore": highscore})
54
55     return scoreboard
56
57 @app.get(fba.config["base_url"] + "/api")
58 def blocked(domain: str = None, reason: str = None, reverse: str = None):
59     if domain == None and reason == None and reverse == None:
60         raise HTTPException(status_code=400, detail="No filter specified")
61     if reason != None:
62         reason = re.sub("(%|_)", "", reason)
63         if len(reason) < 3:
64             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
65     if domain != None:
66         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
67         punycode = domain.encode('idna').decode('utf-8')
68         fba.c.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_added ASC",
69                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
70     elif reverse != None:
71         fba.c.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks WHERE blocker = ? ORDER BY first_added ASC", [reverse])
72     else:
73         fba.c.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_added ASC", ["%" + reason + "%"])
74     blocks = fba.c.fetchall()
75
76     result = {}
77     for blocker, blocked, block_level, reason, first_added, last_seen in blocks:
78         entry = {
79             "blocker"    : blocker,
80             "blocked"    : blocked,
81             "reason"     : reason,
82             "first_added": first_added,
83             "last_seen"  : last_seen
84         }
85         if block_level in result:
86             result[block_level].append(entry)
87         else:
88             result[block_level] = [entry]
89
90     return result
91
92 @app.get(fba.config["base_url"] + "/scoreboard")
93 def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None):
94     scores = None
95
96     if blockers != None and blockers > 0:
97         res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/top?blockers={blockers}")
98     elif blocked != None and blocked > 0:
99         res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/top?blocked={blocked}")
100     elif reference != None and reference > 0:
101         res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/top?reference={reference}")
102     elif software != None and software > 0:
103         res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/top?software={software}")
104     else:
105         raise HTTPException(status_code=400, detail="No filter specified")
106
107     if res == None:
108         raise HTTPException(status_code=500, detail="Could not determine scores")
109     elif not res.ok:
110         raise HTTPException(status_code=res.status_code, detail=res.text)
111
112     return templates.TemplateResponse("index.html", {
113         "base_url"  : fba.config["base_url"],
114         "request"   : request,
115         "scoreboard": True,
116         "blockers"  : blockers,
117         "blocked"   : blocked,
118         "reference" : reference,
119         "software"  : software,
120         "scores"    : res.json()
121     })
122
123 @app.get(fba.config["base_url"] + "/")
124 def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
125     if domain == "" or reason == "" or reverse == "":
126         return responses.RedirectResponse("/")
127
128     info = None
129     blocks = None
130
131     if domain == None and reason == None and reverse == None:
132         info = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/info")
133
134         if not info.ok:
135             raise HTTPException(status_code=info.status_code, detail=info.text)
136
137         info = info.json()
138     elif domain != None:
139         blocks = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api?domain={domain}")
140     elif reason != None:
141         blocks = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api?reason={reason}")
142     elif reverse != None:
143         blocks = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api?reverse={reverse}")
144
145     if blocks != None:
146         if not blocks.ok:
147             raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
148         blocks = blocks.json()
149         for block_level in blocks:
150             for block in blocks[block_level]:
151                 block["first_added"] = datetime.utcfromtimestamp(block["first_added"]).strftime('%Y-%m-%d %H:%M')
152                 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime('%Y-%m-%d %H:%M')
153
154     return templates.TemplateResponse("index.html", {"request": request, "domain": domain, "blocks": blocks, "reason": reason, "reverse": reverse, "info": info})
155
156 @app.get(fba.config["base_url"] + "/api/mutual")
157 def mutual(domains: list[str] = Query()):
158     """Return 200 if federation is open between the two, 4xx otherwise"""
159     fba.c.execute(
160         "SELECT block_level FROM blocks " \
161         "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
162         "AND block_level = 'reject' " \
163         "LIMIT 1",
164         {
165             "a": domains[0],
166             "b": domains[1],
167             "aw": "*." + domains[0],
168             "bw": "*." + domains[1],
169         },
170     )
171     res = fba.c.fetchone()
172
173     if res is not None:
174         # Blocks found
175         return responses.JSONResponse(status_code=418, content={})
176
177     # No known blocks
178     return responses.JSONResponse(status_code=200, content={})
179
180 @app.get(fba.config["base_url"] + "/rss")
181 def rss(request: Request, domain: str = None):
182     if domain != None:
183         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
184         punycode = domain.encode('idna').decode('utf-8')
185         fba.c.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_added DESC LIMIT 50",
186                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
187     else:
188         fba.c.execute("SELECT blocker, blocked, block_level, reason, first_added, last_seen FROM blocks ORDER BY first_added DESC LIMIT 50")
189
190     blocks = fba.c.fetchall()
191
192     result = []
193     for blocker, blocked, block_level, reason, first_added, last_seen in blocks:
194         first_added = utils.format_datetime(datetime.fromtimestamp(first_added))
195         if reason == None or reason == '':
196             reason = "No reason provided."
197         else:
198             reason = "Provided reason: '" + reason + "'"
199
200         result.append({
201             "blocker": blocker,
202             "blocked": blocked,
203             "block_level": block_level,
204             "reason": reason,
205             "first_added": first_added
206         })
207
208     timestamp = utils.format_datetime(datetime.now())
209
210     return templates.TemplateResponse("rss.xml", {
211         "request": request,
212         "timestamp": timestamp,
213         "domain": domain,
214         "blocks": result
215     }, headers={
216         "Content-Type": "application/rss+xml"
217     })
218
219 if __name__ == "__main__":
220     uvicorn.run("api:app", host="127.0.0.1", port=fba.config["port"], log_level=fba.config["log_level"])