]> git.mxchange.org Git - fba.git/blob - api.py
Prevent single-character search DoS
[fba.git] / api.py
1 import uvicorn
2 from fastapi import FastAPI, Request, HTTPException, responses
3 import sqlite3
4 from hashlib import sha256
5 from fastapi.templating import Jinja2Templates
6 from requests import get
7 from json import loads
8
9 with open("config.json") as f:
10     config = loads(f.read())
11     base_url = config["base_url"]
12     port = config["port"]
13 app = FastAPI(docs_url=base_url+"/docs", redoc_url=base_url+"/redoc")
14 templates = Jinja2Templates(directory=".")
15
16 def get_hash(domain: str) -> str:
17     return sha256(domain.encode("utf-8")).hexdigest()
18
19 @app.get(base_url+"/info")
20 def info():
21     conn = sqlite3.connect("blocks.db")
22     c = conn.cursor()
23     c.execute("select (select count(domain) from instances), (select count(domain) from instances where software in ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica')), (select count(blocker) from blocks)")
24     known, indexed, blocks = c.fetchone()
25     c.close()
26     return {
27         "known_instances": known,
28         "indexed_instances": indexed,
29         "blocks_recorded": blocks,
30         "source_code": "https://git.kiwifarms.net/mint/fedi-block-api",
31     }
32
33 @app.get(base_url+"/api")
34 def blocked(domain: str = None, reason: str = None):
35     if domain == None and reason == None:
36         raise HTTPException(status_code=400, detail="No filter specified")
37     if domain == None and reason == None:
38     c = conn.cursor()
39     if domain != None:
40         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
41         punycode = domain.encode('idna').decode('utf-8')
42         c.execute("select blocker, blocked, block_level, reason from blocks where blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ?",
43                   (domain, "*." + domain, wildchar, get_hash(domain), punycode, "*." + punycode))
44     else:
45         if len(reason) < 3:
46             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
47         else:
48             c.execute("select blocker, blocked, reason, block_level from blocks where reason like ? and reason != ''", ("%"+reason+"%",))
49     blocks = c.fetchall()
50     conn.close()
51
52     result = {}
53     reasons = {}
54     wildcards = []
55     if domain != None:
56         for domain, blocked, block_level, reason in blocks:
57             if block_level in result:
58                 result[block_level].append(domain)
59             else:
60                 result[block_level] = [domain]
61             if blocked == "*." + ".".join(blocked.split(".")[-blocked.count("."):]):
62                 wildcards.append(domain)
63             if reason != "":
64                 if block_level in reasons:
65                     reasons[block_level][domain] = reason
66                 else:
67                     reasons[block_level] = {domain: reason}
68         return {"blocks": result, "reasons": reasons, "wildcards": wildcards}
69
70     for blocker, blocked, reason, block_level in blocks:
71         if block_level in result:
72             result[block_level].append({"blocker": blocker, "blocked": blocked, "reason": reason})
73         else:
74             result[block_level] = [{"blocker": blocker, "blocked": blocked, "reason": reason}]
75     return {"blocks": result}
76
77 @app.get(base_url+"/")
78 def index(request: Request, domain: str = None, reason: str = None):
79     if domain == "" or reason == "":
80         return responses.RedirectResponse("/")
81     info = None
82     blocks = None
83     if domain == None and reason == None:
84         info = get(f"http://127.0.0.1:{port}{base_url}/info")
85         if not info.ok:
86             raise HTTPException(status_code=info.status_code, detail=info.text)
87         info = info.json()
88     elif domain != None:
89         blocks = get(f"http://127.0.0.1:{port}{base_url}/api?domain={domain}")
90     elif reason != None:
91         blocks = get(f"http://127.0.0.1:{port}{base_url}/api?reason={reason}")
92     if blocks != None:
93         if not blocks.ok:
94             raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
95         blocks = blocks.json()
96     return templates.TemplateResponse("index.html", {"request": request, "domain": domain, "blocks": blocks, "reason": reason, "info": info})
97
98 if __name__ == "__main__":
99     uvicorn.run("api:app", host="127.0.0.1", port=port, log_level="info")