from fba.http import network
+from fba.models import blocks
+
router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
templates = Jinja2Templates(directory="templates")
return scores
@router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
-def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
- if domain is None and reason is None and reverse is None:
- raise HTTPException(status_code=400, detail="No filter specified")
-
- if reason is not None:
- reason = re.sub("(%|_)", "", tidyup.reason(reason))
- if len(reason) < 3:
- raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
-
- if domain is not None:
- domain = tidyup.domain(domain)
- if not validators.domain(domain.split("/")[0]):
- raise HTTPException(status_code=500, detail="Invalid domain")
+def api_index(request: Request, mode: str, value: str, amount: int):
+ if mode is None or value is None or amount is None:
+ raise HTTPException(status_code=500, detail="No filter specified")
+ elif amount > 500:
+ raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
+
+ domain = whildchar = punycode = reason = None
+
+ if mode == "block_level":
+ database.cursor.execute(
+ "SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE block_level = ? LIMIT ?", [value, amount]
+ )
+ elif mode in ["domain", "reverse"]:
+ domain = tidyup.domain(value)
+ if not utils.is_domain_wanted(domain):
+ raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode('idna').decode('utf-8')
+ elif mode == "reason":
+ reason = re.sub("(%|_)", "", tidyup.reason(value))
+ if len(reason) < 3:
+ raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
- database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
- (domain, "*." + domain, wildchar, utils.get_hash(domain), punycode, "*." + punycode))
- elif reverse is not None:
- reverse = tidyup.domain(reverse)
- if not validators.domain(reverse):
- raise HTTPException(status_code=500, detail="Invalid domain")
-
- database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
- else:
- database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
+ if mode == "domain":
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen \
+FROM blocks \
+WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC LIMIT ?",
+ [
+ domain,
+ "*." + domain,
+ wildchar,
+ utils.get_hash(domain),
+ punycode,
+ "*." + punycode,
+ amount
+ ]
+ )
+ elif mode == "reverse":
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen \
+FROM blocks \
+WHERE blocker = ? OR blocker = ? OR blocker = ? OR blocker = ? OR blocker = ? OR blocker = ? \
+ORDER BY first_seen ASC \
+LIMIT ?", [
+ domain,
+ "*." + domain,
+ wildchar,
+ utils.get_hash(domain),
+ punycode,
+ "*." + punycode,
+ amount
+ ])
+ elif mode == "reason":
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen \
+FROM blocks \
+WHERE reason LIKE ? AND reason != '' \
+ORDER BY first_seen ASC \
+LIMIT ?", [
+ "%" + reason + "%",
+ amount
+ ])
blocklist = database.cursor.fetchall()
})
@router.get(config.get("base_url") + "/top")
-def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
- if domain == "" or reason == "" or reverse == "":
- raise HTTPException(status_code=500, detail="Insufficient parameter provided")
-
+def top(request: Request, mode: str, value: str, amount: int = 500):
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
if not response.ok:
raise HTTPException(status_code=response.status_code, detail=response.text)
+ elif mode == "" or value == "" or amount == 0:
+ raise HTTPException(status_code=500, detail="Parameter mode, value and amount must always be set")
+ elif amount > 500:
+ raise HTTPException(status_code=500, detail=f"amount='{amount}' is to big")
info = response.json()
response = None
+ blocklist = list()
- if domain is not None:
- domain = tidyup.domain(domain)
- if not validators.domain(domain.split("/")[0]):
- raise HTTPException(status_code=500, detail="Invalid domain")
+ if mode == "block_level" and not blocks.is_valid_level(value):
+ raise HTTPException(status_code=500, detail="Invalid block level provided")
+ elif mode in ["domain", "reverse"] and not utils.is_domain_wanted(value):
+ raise HTTPException(status_code=500, detail="Invalid or blocked domain specified")
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
- elif reason is not None:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
- elif reverse is not None:
- reverse = tidyup.domain(reverse)
- if not validators.domain(reverse):
- raise HTTPException(status_code=500, detail="Invalid domain")
-
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?mode={mode}&value={value}&amount={amount}")
if response is not None:
- if not response.ok:
- raise HTTPException(status_code=response.status_code, detail=response.text)
-
blocklist = response.json()
- for block_level in blocklist:
- for block in blocklist[block_level]:
- block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
- block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
+ for block_level in blocklist:
+ for block in blocklist[block_level]:
+ block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
+ block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
return templates.TemplateResponse("views/top.html", {
"request": request,
- "domain" : domain,
+ "mode" : mode if response is not None else None,
+ "value" : value if response is not None else None,
+ "amount" : amount if response is not None else None,
"blocks" : blocklist,
- "reason" : reason,
- "reverse": reverse,
- "info" : info
+ "info" : info,
})
@router.get(config.get("base_url") + "/rss")