from datetime import datetime
from email.utils import format_datetime
from pathlib import Path
-from urllib.parse import urlparse
import fastapi
from fastapi import Request, HTTPException, Query
from fba.helpers import blacklist
from fba.helpers import config
+from fba.helpers import domain as domain_helper
from fba.helpers import json as json_helper
from fba.helpers import tidyup
@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
def api_info():
- database.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube', 'takahe', 'gotosocial', 'brighteon', 'wildebeest', 'bookwyrm')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL)")
+ database.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube', 'takahe', 'gotosocial', 'brighteon', 'wildebeest', 'bookwyrm', 'mitra', 'areionskey')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL)")
row = database.cursor.fetchone()
return JSONResponse(status_code=200, content={
scores = list()
- for domain, score in database.cursor.fetchall():
+ for row in database.cursor.fetchall():
scores.append({
- "domain": domain,
- "score" : round(score)
+ "domain": row[0],
+ "score" : round(row[1]),
})
return JSONResponse(status_code=200, content=scores)
elif amount > config.get("api_limit"):
raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
- if mode in ("detection_mode", "software", "command"):
+ if mode in ("detection_mode", "software", "command", "origin"):
database.cursor.execute(
f"SELECT domain, origin, software, detection_mode, command, total_peers, total_blocks, first_seen, last_updated \
FROM instances \
)
elif mode in ["domain", "reverse"]:
domain = tidyup.domain(value)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
# Tidy up domain name
domain = tidyup.domain(domain).encode("idna").decode("utf-8")
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
# Fetch domain data
@router.get(config.get("base_url") + "/scoreboard")
def scoreboard(request: Request, mode: str, amount: int):
- response = None
-
- if mode == "blocker" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=blocker&amount={amount}")
- elif mode == "blocked" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=blocked&amount={amount}")
- elif mode == "reference" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=reference&amount={amount}")
- elif mode == "software" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=software&amount={amount}")
- elif mode == "command" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=command&amount={amount}")
- elif mode == "error_code" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=error_code&amount={amount}")
- elif mode == "detection_mode" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=detection_mode&amount={amount}")
- elif mode == "avg_peers" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=avg_peers&amount={amount}")
- elif mode == "obfuscator" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=obfuscator&amount={amount}")
- elif mode == "obfuscation" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=obfuscation&amount={amount}")
- elif mode == "block_level" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=block_level&amount={amount}")
- else:
- raise HTTPException(status_code=400, detail="No filter specified")
+ if mode == "":
+ raise HTTPException(status_code=400, detail="No mode specified")
+ elif amount <= 0:
+ raise HTTPException(status_code=500, detail="Invalid amount specified")
+
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode={mode}&amount={amount}")
if response is None:
raise HTTPException(status_code=500, detail="Could not determine scores")
def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
if mode == "block_level" and not blocks.valid(value, "block_level"):
raise HTTPException(status_code=500, detail="Invalid block level provided")
- elif mode in ["domain", "reverse"] and not utils.is_domain_wanted(value):
+ elif mode in ["domain", "reverse"] and not domain_helper.is_wanted(value):
raise HTTPException(status_code=500, detail="Invalid or blocked domain specified")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}")
# Tidy up domain name
domain = tidyup.domain(domain).encode("idna").decode("utf-8")
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/domain.json?domain={domain}")