+ if mode == "blocker" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocker&amount={amount}")
+ elif mode == "blocked" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocked&amount={amount}")
+ elif mode == "reference" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=reference&amount={amount}")
+ elif mode == "software" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=software&amount={amount}")
+ elif mode == "command" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=command&amount={amount}")
+ elif mode == "error_code" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=error_code&amount={amount}")
+ else:
+ raise HTTPException(status_code=400, detail="No filter specified")
+
+ if response is None:
+ raise HTTPException(status_code=500, detail="Could not determine scores")
+ elif not response.ok:
+ raise HTTPException(status_code=response.status_code, detail=response.text)
+
+ return templates.TemplateResponse("views/scoreboard.html", {
+ "base_url" : config.get("base_url"),
+ "slogan" : config.get("slogan"),
+ "request" : request,
+ "scoreboard": True,
+ "mode" : mode,
+ "amount" : amount,
+ "scores" : network.json_from_response(response)
+ })
+
+@router.get(config.get("base_url") + "/")
+def index(request: Request):
+ # Get info
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
+
+ if not response.ok:
+ raise HTTPException(status_code=response.status_code, detail=response.text)
+
+ return templates.TemplateResponse("views/index.html", {
+ "request": request,
+ "info" : response.json()
+ })
+
+@router.get(config.get("base_url") + "/top")
+def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
+ if domain == "" or reason == "" or reverse == "":
+ raise HTTPException(status_code=500, detail="Insufficient parameter provided")
+
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
+
+ if not response.ok:
+ raise HTTPException(status_code=response.status_code, detail=response.text)
+
+ info = response.json()
+ response = None
+
+ if domain is not None:
+ domain = tidyup.domain(domain)
+ if not validators.domain(domain):
+ raise HTTPException(status_code=500, detail="Invalid domain")
+
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
+ elif reason is not None:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
+ elif reverse is not None:
+ reverse = tidyup.domain(reverse)
+ if not validators.domain(reverse):
+ raise HTTPException(status_code=500, detail="Invalid domain")
+
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
+
+ if response is not None:
+ if not response.ok:
+ raise HTTPException(status_code=response.status_code, detail=response.text)
+
+ blocklist = response.json()
+
+ for block_level in blocklist:
+ for block in blocklist[block_level]:
+ block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
+ block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
+
+ return templates.TemplateResponse("views/top.html", {
+ "request": request,
+ "domain" : domain,
+ "blocks" : blocklist,
+ "reason" : reason,
+ "reverse": reverse,
+ "info" : info
+ })
+
+@router.get(config.get("base_url") + "/rss")
+def rss(request: Request, domain: str = None):
+ if domain is not None:
+ domain = tidyup.domain(domain)
+
+ wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
+ punycode = domain.encode("idna").decode("utf-8")
+
+ fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT ?", [
+ domain,
+ "*." + domain, wildchar,
+ fba.get_hash(domain),
+ punycode,
+ "*." + punycode,
+ config.get("rss_limit")
+ ])
+ else:
+ fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT ?", [config.get("rss_limit")])
+
+ result = fba.cursor.fetchall()
+
+ blocklist = []
+ for blocker, blocked, block_level, reason, first_seen, last_seen in result:
+ first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
+ if reason is None or reason == "":
+ reason = "No reason provided."
+ else:
+ reason = "Provided reason: '" + reason + "'"
+
+ blocklist.append({
+ "blocker" : blocker,
+ "blocked" : blocked,
+ "block_level": block_level,
+ "reason" : reason,
+ "first_seen" : first_seen,
+ "last_seen" : last_seen,
+ })
+
+ return templates.TemplateResponse("rss.xml", {
+ "request" : request,
+ "timestamp": utils.format_datetime(datetime.now()),
+ "domain" : domain,
+ "hostname" : config.get("hostname"),
+ "blocks" : blocklist
+ }, headers={
+ "Content-Type": "routerlication/rss+xml"
+ })
+
+@router.get(config.get("base_url") + "/robots.txt", response_class=PlainTextResponse)
+def robots(request: Request):
+ return templates.TemplateResponse("robots.txt", {
+ "request" : request,
+ "base_url": config.get("base_url")
+ })
+
+if __name__ == "__main__":
+ uvicorn.run("api:router", host=config.get("host"), port=config.get("port"), log_level=config.get("log_level"))