]> git.mxchange.org Git - fba.git/blob - api.py
Additional query DoS mitigation
[fba.git] / api.py
1 import uvicorn
2 from fastapi import FastAPI, Request, HTTPException, responses
3 import sqlite3
4 from hashlib import sha256
5 from fastapi.templating import Jinja2Templates
6 from requests import get
7 from json import loads
8 from re import sub
9
10 with open("config.json") as f:
11     config = loads(f.read())
12     base_url = config["base_url"]
13     port = config["port"]
14 app = FastAPI(docs_url=base_url+"/docs", redoc_url=base_url+"/redoc")
15 templates = Jinja2Templates(directory=".")
16
17 def get_hash(domain: str) -> str:
18     return sha256(domain.encode("utf-8")).hexdigest()
19
20 @app.get(base_url+"/info")
21 def info():
22     conn = sqlite3.connect("blocks.db")
23     c = conn.cursor()
24     c.execute("select (select count(domain) from instances), (select count(domain) from instances where software in ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica')), (select count(blocker) from blocks)")
25     known, indexed, blocks = c.fetchone()
26     c.close()
27     return {
28         "known_instances": known,
29         "indexed_instances": indexed,
30         "blocks_recorded": blocks,
31         "source_code": "https://git.kiwifarms.net/mint/fedi-block-api",
32     }
33
34 @app.get(base_url+"/api")
35 def blocked(domain: str = None, reason: str = None):
36     if domain == None and reason == None:
37         raise HTTPException(status_code=400, detail="No filter specified")
38     if reason != None:
39         reason = sub("(%|_)", "", reason)
40         if len(reason) < 3:
41             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
42     conn = sqlite3.connect("blocks.db")
43     c = conn.cursor()
44     if domain != None:
45         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
46         punycode = domain.encode('idna').decode('utf-8')
47         c.execute("select blocker, blocked, block_level, reason from blocks where blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ?",
48                   (domain, "*." + domain, wildchar, get_hash(domain), punycode, "*." + punycode))
49     else:
50         c.execute("select blocker, blocked, reason, block_level from blocks where reason like ? and reason != ''", ("%"+reason+"%",))
51     blocks = c.fetchall()
52     conn.close()
53
54     result = {}
55     reasons = {}
56     wildcards = []
57     if domain != None:
58         for domain, blocked, block_level, reason in blocks:
59             if block_level in result:
60                 result[block_level].append(domain)
61             else:
62                 result[block_level] = [domain]
63             if blocked == "*." + ".".join(blocked.split(".")[-blocked.count("."):]):
64                 wildcards.append(domain)
65             if reason != "":
66                 if block_level in reasons:
67                     reasons[block_level][domain] = reason
68                 else:
69                     reasons[block_level] = {domain: reason}
70         return {"blocks": result, "reasons": reasons, "wildcards": wildcards}
71
72     for blocker, blocked, reason, block_level in blocks:
73         if block_level in result:
74             result[block_level].append({"blocker": blocker, "blocked": blocked, "reason": reason})
75         else:
76             result[block_level] = [{"blocker": blocker, "blocked": blocked, "reason": reason}]
77     return {"blocks": result}
78
79 @app.get(base_url+"/")
80 def index(request: Request, domain: str = None, reason: str = None):
81     if domain == "" or reason == "":
82         return responses.RedirectResponse("/")
83     info = None
84     blocks = None
85     if domain == None and reason == None:
86         info = get(f"http://127.0.0.1:{port}{base_url}/info")
87         if not info.ok:
88             raise HTTPException(status_code=info.status_code, detail=info.text)
89         info = info.json()
90     elif domain != None:
91         blocks = get(f"http://127.0.0.1:{port}{base_url}/api?domain={domain}")
92     elif reason != None:
93         blocks = get(f"http://127.0.0.1:{port}{base_url}/api?reason={reason}")
94     if blocks != None:
95         if not blocks.ok:
96             raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
97         blocks = blocks.json()
98     return templates.TemplateResponse("index.html", {"request": request, "domain": domain, "blocks": blocks, "reason": reason, "info": info})
99
100 if __name__ == "__main__":
101     uvicorn.run("api:app", host="127.0.0.1", port=port, log_level="info")