2 from fastapi import FastAPI, Request, HTTPException, responses
4 from hashlib import sha256
5 from fastapi.templating import Jinja2Templates
6 from requests import get
10 with open("config.json") as f:
11 config = loads(f.read())
12 base_url = config["base_url"]
14 app = FastAPI(docs_url=base_url+"/docs", redoc_url=base_url+"/redoc")
15 templates = Jinja2Templates(directory=".")
17 def get_hash(domain: str) -> str:
18 return sha256(domain.encode("utf-8")).hexdigest()
20 @app.get(base_url+"/info")
22 conn = sqlite3.connect("blocks.db")
24 c.execute("select (select count(domain) from instances), (select count(domain) from instances where software in ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica')), (select count(blocker) from blocks)")
25 known, indexed, blocks = c.fetchone()
28 "known_instances": known,
29 "indexed_instances": indexed,
30 "blocks_recorded": blocks
33 @app.get(base_url+"/api")
34 def blocked(domain: str = None, reason: str = None):
35 if domain == None and reason == None:
36 raise HTTPException(status_code=400, detail="No filter specified")
38 reason = sub("(%|_)", "", reason)
40 raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
41 conn = sqlite3.connect("blocks.db")
44 wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
45 punycode = domain.encode('idna').decode('utf-8')
46 c.execute("select blocker, blocked, block_level, reason from blocks where blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ? or blocked = ?",
47 (domain, "*." + domain, wildchar, get_hash(domain), punycode, "*." + punycode))
49 c.execute("select blocker, blocked, reason, block_level from blocks where reason like ? and reason != ''", ("%"+reason+"%",))
57 for domain, blocked, block_level, reason in blocks:
58 if block_level in result:
59 result[block_level].append(domain)
61 result[block_level] = [domain]
62 if blocked == "*." + ".".join(blocked.split(".")[-blocked.count("."):]):
63 wildcards.append(domain)
65 if block_level in reasons:
66 reasons[block_level][domain] = reason
68 reasons[block_level] = {domain: reason}
69 return {"blocks": result, "reasons": reasons, "wildcards": wildcards}
71 for blocker, blocked, reason, block_level in blocks:
72 if block_level in result:
73 result[block_level].append({"blocker": blocker, "blocked": blocked, "reason": reason})
75 result[block_level] = [{"blocker": blocker, "blocked": blocked, "reason": reason}]
76 return {"blocks": result}
78 @app.get(base_url+"/")
79 def index(request: Request, domain: str = None, reason: str = None):
80 if domain == "" or reason == "":
81 return responses.RedirectResponse("/")
84 if domain == None and reason == None:
85 info = get(f"http://127.0.0.1:{port}{base_url}/info")
87 raise HTTPException(status_code=info.status_code, detail=info.text)
90 blocks = get(f"http://127.0.0.1:{port}{base_url}/api?domain={domain}")
92 blocks = get(f"http://127.0.0.1:{port}{base_url}/api?reason={reason}")
95 raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
96 blocks = blocks.json()
97 return templates.TemplateResponse("index.html", {"request": request, "domain": domain, "blocks": blocks, "reason": reason, "info": info})
99 if __name__ == "__main__":
100 uvicorn.run("api:app", host="127.0.0.1", port=port, log_level="info")