from datetime import datetime
from email.utils import format_datetime
from pathlib import Path
-from urllib.parse import urlparse
import fastapi
from fastapi import Request, HTTPException, Query
from fba.helpers import blacklist
from fba.helpers import config
+from fba.helpers import domain as domain_helper
from fba.helpers import json as json_helper
from fba.helpers import tidyup
scores = list()
- for domain, score in database.cursor.fetchall():
+ for row in database.cursor.fetchall():
scores.append({
- "domain": domain,
- "score" : round(score)
+ "domain": row[0],
+ "score" : round(row[1]),
})
return JSONResponse(status_code=200, content=scores)
elif amount > config.get("api_limit"):
raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
- if mode in ("detection_mode", "software", "command"):
+ if mode in ("detection_mode", "software", "command", "origin"):
database.cursor.execute(
f"SELECT domain, origin, software, detection_mode, command, total_peers, total_blocks, first_seen, last_updated \
FROM instances \
)
elif mode in ["domain", "reverse"]:
domain = tidyup.domain(value)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
# Tidy up domain name
domain = tidyup.domain(domain).encode("idna").decode("utf-8")
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
# Fetch domain data
@router.get(config.get("base_url") + "/scoreboard")
def scoreboard(request: Request, mode: str, amount: int):
- response = None
-
- if mode == "blocker" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=blocker&amount={amount}")
- elif mode == "blocked" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=blocked&amount={amount}")
- elif mode == "reference" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=reference&amount={amount}")
- elif mode == "software" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=software&amount={amount}")
- elif mode == "command" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=command&amount={amount}")
- elif mode == "error_code" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=error_code&amount={amount}")
- elif mode == "detection_mode" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=detection_mode&amount={amount}")
- elif mode == "avg_peers" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=avg_peers&amount={amount}")
- elif mode == "obfuscator" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=obfuscator&amount={amount}")
- elif mode == "obfuscation" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=obfuscation&amount={amount}")
- elif mode == "block_level" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=block_level&amount={amount}")
- else:
- raise HTTPException(status_code=400, detail="No filter specified")
+ if mode == "":
+ raise HTTPException(status_code=400, detail="No mode specified")
+ elif amount <= 0:
+ raise HTTPException(status_code=500, detail="Invalid amount specified")
+
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode={mode}&amount={amount}")
if response is None:
raise HTTPException(status_code=500, detail="Could not determine scores")
def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
if mode == "block_level" and not blocks.valid(value, "block_level"):
raise HTTPException(status_code=500, detail="Invalid block level provided")
- elif mode in ["domain", "reverse"] and not utils.is_domain_wanted(value):
+ elif mode in ["domain", "reverse"] and not domain_helper.is_wanted(value):
raise HTTPException(status_code=500, detail="Invalid or blocked domain specified")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}")
# Tidy up domain name
domain = tidyup.domain(domain).encode("idna").decode("utf-8")
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/domain.json?domain={domain}")