From e2fe58f6c38d407b3115537ead583c1be88408dd Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Mon, 12 Jun 2023 13:29:47 +0200 Subject: [PATCH] Continued: - tidyup + validate domains/reasons --- api.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/api.py b/api.py index 70d7211..4af491a 100644 --- a/api.py +++ b/api.py @@ -33,6 +33,8 @@ from fba import config from fba import fba from fba import network +from fba.helpers import tidyup + router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc") templates = Jinja2Templates(directory="templates") @@ -85,16 +87,25 @@ def api_blocked(domain: str = None, reason: str = None, reverse: str = None): raise HTTPException(status_code=400, detail="No filter specified") if reason is not None: - reason = re.sub("(%|_)", "", reason) + reason = re.sub("(%|_)", "", tidyup.reason(reason)) if len(reason) < 3: raise HTTPException(status_code=400, detail="Keyword is shorter than three characters") if domain is not None: + domain = tidyup.domain(domain) + if not validators.domain(domain): + raise HTTPException(status_code=500, detail="Invalid domain") + wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):]) punycode = domain.encode('idna').decode('utf-8') + fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC", (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode)) elif reverse is not None: + reverse = tidyup.domain(reverse) + if not validators.domain(reverse): + raise HTTPException(status_code=500, detail="Invalid domain") + fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse]) else: fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"]) @@ -206,6 +217,7 @@ def top(request: Request, domain: str = None, reason: str = None, reverse: str = response = None if domain is not None: + domain = tidyup.domain(domain) if not validators.domain(domain): raise HTTPException(status_code=500, detail="Invalid domain") @@ -213,6 +225,7 @@ def top(request: Request, domain: str = None, reason: str = None, reverse: str = elif reason is not None: response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}") elif reverse is not None: + reverse = tidyup.domain(reverse) if not validators.domain(reverse): raise HTTPException(status_code=500, detail="Invalid domain") @@ -221,6 +234,7 @@ def top(request: Request, domain: str = None, reason: str = None, reverse: str = if response is not None: if not response.ok: raise HTTPException(status_code=response.status_code, detail=response.text) + blocklist = response.json() for block_level in blocklist: for block in blocklist[block_level]: @@ -239,6 +253,8 @@ def top(request: Request, domain: str = None, reason: str = None, reverse: str = @router.get(config.get("base_url") + "/rss") def rss(request: Request, domain: str = None): if domain is not None: + domain = tidyup.domain(domain) + wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):]) punycode = domain.encode('idna').decode('utf-8') fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50", -- 2.39.5