]> git.mxchange.org Git - fba.git/blob - api.py
RSS feed for latest blocks
[fba.git] / api.py
1 import uvicorn
2 from fastapi import FastAPI, Request, HTTPException, responses, Query
3 import sqlite3
4 from hashlib import sha256
5 from fastapi.templating import Jinja2Templates
6 from requests import get
7 from json import loads
8 from re import sub
9 from datetime import datetime
10 from email import utils
11
12 with open("config.json") as f:
13     config = loads(f.read())
14     base_url = config["base_url"]
15     port = config["port"]
16 app = FastAPI(docs_url=base_url+"/docs", redoc_url=base_url+"/redoc")
17 templates = Jinja2Templates(directory=".")
18
19 def get_hash(domain: str) -> str:
20     return sha256(domain.encode("utf-8")).hexdigest()
21
22 @app.get(base_url+"/info")
23 def info():
24     conn = sqlite3.connect("blocks.db")
25     c = conn.cursor()
26     c.execute("select (select count(domain) from instances), (select count(domain) from instances where software in ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica')), (select count(blocker) from blocks)")
27     known, indexed, blocks = c.fetchone()
28     c.close()
29     return {
30         "known_instances": known,
31         "indexed_instances": indexed,
32         "blocks_recorded": blocks
33     }
34
35 @app.get(base_url+"/top")
36 def top(blocked: int = None, blockers: int = None):
37     conn = sqlite3.connect("blocks.db")
38     c = conn.cursor()
39     if blocked == None and blockers == None:
40         raise HTTPException(status_code=400, detail="No filter specified")
41     elif blocked != None:
42         if blocked > 500:
43             raise HTTPException(status_code=400, detail="Too many results")
44         c.execute("select blocked, count(blocked) from blocks where block_level = 'reject' group by blocked order by count(blocked) desc limit ?", (blocked,))
45     elif blockers != None:
46         if blockers > 500:
47             raise HTTPException(status_code=400, detail="Too many results")
48         c.execute("select blocker, count(blocker) from blocks where block_level = 'reject' group by blocker order by count(blocker) desc limit ?", (blockers,))
49     scores = c.fetchall()
50     c.close()
51
52     scoreboard = []
53     print(scores)
54     for domain, highscore in scores:
55         scoreboard.append({"domain": domain, "highscore": highscore})
56
57     return scoreboard
58
59 @app.get(base_url+"/api")
60 def blocked(domain: str = None, reason: str = None, reverse: str = None):
61     if domain == None and reason == None and reverse == None:
62         raise HTTPException(status_code=400, detail="No filter specified")
63     if reason != None:
64         reason = sub("(%|_)", "", reason)
65         if len(reason) < 3:
66             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
67     conn = sqlite3.connect("blocks.db")
68     c = conn.cursor()
69     if domain != None:
70         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
71         punycode = domain.encode('idna').decode('utf-8')
72         c.execute("select blocker, blocked, block_level, reason, first_added, last_seen from blocks where blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ? order by first_added asc",
73                   (domain, "*." + domain, wildchar, get_hash(domain), punycode, "*." + punycode))
74     elif reverse != None:
75         c.execute("select blocker, blocked, block_level, reason, first_added, last_seen from blocks where blocker = ? order by first_added asc", (reverse,))
76     else:
77         c.execute("select blocker, blocked, block_level, reason, first_added, last_seen from blocks where reason like ? and reason != '' order by first_added asc", ("%"+reason+"%",))
78     blocks = c.fetchall()
79     c.close()
80
81     result = {}
82     for blocker, blocked, block_level, reason, first_added, last_seen in blocks:
83         entry = {"blocker": blocker, "blocked": blocked, "reason": reason, "first_added": first_added, "last_seen": last_seen}
84         if block_level in result:
85             result[block_level].append(entry)
86         else:
87             result[block_level] = [entry]
88
89     return result
90
91 @app.get(base_url+"/scoreboard")
92 def index(request: Request, blockers: int = None, blocked: int = None):
93     if blockers == None and blocked == None:
94         raise HTTPException(status_code=400, detail="No filter specified")
95     elif blockers != None:
96         scores = get(f"http://127.0.0.1:{port}{base_url}/top?blockers={blockers}")
97     elif blocked != None:
98         scores = get(f"http://127.0.0.1:{port}{base_url}/top?blocked={blocked}")
99     if scores != None:
100         if not scores.ok:
101             raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
102         scores = scores.json()
103     return templates.TemplateResponse("index.html", {"request": request, "scoreboard": True, "blockers": blockers, "blocked": blocked, "scores": scores})
104
105 @app.get(base_url+"/")
106 def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
107     if domain == "" or reason == "" or reverse == "":
108         return responses.RedirectResponse("/")
109     info = None
110     blocks = None
111     if domain == None and reason == None and reverse == None:
112         info = get(f"http://127.0.0.1:{port}{base_url}/info")
113         if not info.ok:
114             raise HTTPException(status_code=info.status_code, detail=info.text)
115         info = info.json()
116     elif domain != None:
117         blocks = get(f"http://127.0.0.1:{port}{base_url}/api?domain={domain}")
118     elif reason != None:
119         blocks = get(f"http://127.0.0.1:{port}{base_url}/api?reason={reason}")
120     elif reverse != None:
121         blocks = get(f"http://127.0.0.1:{port}{base_url}/api?reverse={reverse}")
122     if blocks != None:
123         if not blocks.ok:
124             raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
125         blocks = blocks.json()
126         for block_level in blocks:
127             for block in blocks[block_level]:
128                 block["first_added"] = datetime.utcfromtimestamp(block["first_added"]).strftime('%Y-%m-%d %H:%M')
129                 block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime('%Y-%m-%d %H:%M')
130
131     return templates.TemplateResponse("index.html", {"request": request, "domain": domain, "blocks": blocks, "reason": reason, "reverse": reverse, "info": info})
132
133 @app.get(base_url+"/api/mutual")
134 def mutual(domains: list[str] = Query()):
135     """Return 200 if federation is open between the two, 4xx otherwise"""
136     conn = sqlite3.connect('blocks.db')
137     c = conn.cursor()
138     c.execute(
139         "SELECT block_level FROM blocks " \
140         "WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
141         "AND block_level = 'reject' " \
142         "LIMIT 1",
143         {
144             "a": domains[0],
145             "b": domains[1],
146             "aw": "*." + domains[0],
147             "bw": "*." + domains[1],
148         },
149     )
150     res = c.fetchone()
151     c.close()
152     if res is not None:
153         # Blocks found
154         return responses.JSONResponse(status_code=418, content={})
155     # No known blocks
156     return responses.JSONResponse(status_code=200, content={})
157
158 @app.get(base_url+"/rss")
159 def rss(request: Request, domain: str = None):
160     conn = sqlite3.connect("blocks.db")
161     c = conn.cursor()
162     if domain != None:
163         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
164         punycode = domain.encode('idna').decode('utf-8')
165         c.execute("select blocker, blocked, block_level, reason, first_added, last_seen from blocks where blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ? order by first_added desc limit 50",
166                   (domain, "*." + domain, wildchar, get_hash(domain), punycode, "*." + punycode))
167     else:
168         c.execute("select blocker, blocked, block_level, reason, first_added, last_seen from blocks order by first_added desc limit 50")
169     blocks = c.fetchall()
170     c.close()
171
172     result = []
173     for blocker, blocked, block_level, reason, first_added, last_seen in blocks:
174         first_added = utils.format_datetime(datetime.fromtimestamp(first_added))
175         if reason == None or reason == '':
176             reason = "No reason provided."
177         else:
178             reason = "Provided reason: '" + reason + "'"
179         result.append({"blocker": blocker, "blocked": blocked, "block_level": block_level, "reason": reason, "first_added": first_added})
180
181     timestamp = utils.format_datetime(datetime.now())
182
183     return templates.TemplateResponse("rss.xml", {"request": request, "timestamp": timestamp, "domain": domain, "blocks": result}, headers={"Content-Type": "application/rss+xml"})
184
185 if __name__ == "__main__":
186     uvicorn.run("api:app", host="127.0.0.1", port=port, log_level="info")