from fba import utils
from fba.helpers import config
+from fba.helpers import json as json_helper
from fba.helpers import tidyup
-from fba.http import network
-
from fba.models import blocks
+from fba.models import instances
router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
router.mount(
@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
def api_info():
- database.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL)")
+ database.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube', 'takahe')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL)")
row = database.cursor.fetchone()
return {
return scores
-@router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
+@router.get(config.get("base_url") + "/api/list.json", response_class=JSONResponse)
+def api_list(request: Request, mode: str, value: str, amount: int):
+ if mode is None or value is None or amount is None:
+ raise HTTPException(status_code=500, detail="No filter specified")
+ elif amount > config.get("api_limit"):
+ raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
+
+ domain = wildchar = punycode = reason = None
+
+ if mode in ("detection_mode", "software", "command"):
+ database.cursor.execute(
+ f"SELECT domain, origin, software, detection_mode, command, total_peers, total_blocks, first_seen, last_updated \
+FROM instances \
+WHERE {mode} = ? \
+ORDER BY domain \
+LIMIT ?", [value, amount]
+ )
+
+ domainlist = database.cursor.fetchall()
+
+ return domainlist
+
+@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
def api_index(request: Request, mode: str, value: str, amount: int):
if mode is None or value is None or amount is None:
raise HTTPException(status_code=500, detail="No filter specified")
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
- punycode = domain.encode('idna').decode('utf-8')
+ punycode = domain.encode("idna").decode("utf-8")
elif mode == "reason":
reason = re.sub("(%|_)", "", tidyup.reason(value))
if len(reason) < 3:
if mode == "domain":
database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen \
FROM blocks \
-WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC LIMIT ?",
+WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? \
+ORDER BY block_level ASC, first_seen ASC \
+LIMIT ?",
[
domain,
"*." + domain,
return result
+@router.get(config.get("base_url") + "/api/domain.json", response_class=JSONResponse)
+def api_domain(domain: str):
+ # Tidy up domain name
+ domain = tidyup.domain(domain)
+
+ if not utils.is_domain_wanted(domain):
+ raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
+
+ # Fetch domain data
+ database.cursor.execute("SELECT * FROM instances WHERE domain = ? LIMIT 1", [domain])
+ domain_data = database.cursor.fetchone()
+
+ if domain_data is None:
+ raise HTTPException(status_code=404, detail=f"domain='{domain}' not found")
+
+ return domain_data
+
@router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
def api_mutual(domains: list[str] = Query()):
"""Return 200 if federation is open between the two, 4xx otherwise"""
"bw": "*." + domains[1],
},
)
- response = database.cursor.fetchone()
- if response is not None:
+ if database.cursor.fetchone() is not None:
# Blocks found
return JSONResponse(status_code=418, content={})
"scoreboard": True,
"mode" : mode,
"amount" : amount,
- "scores" : network.json_from_response(response)
+ "scores" : json_helper.from_response(response)
})
@router.get(config.get("base_url") + "/")
"slogan" : config.get("slogan"),
})
-@router.get(config.get("base_url") + "/top")
-def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
+@router.get(config.get("base_url") + "/list")
+def list_domains(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
+ if mode == "detection_mode" and not instances.valid(value, "detection_mode"):
+ raise HTTPException(status_code=500, detail="Invalid detection mode provided")
- if not response.ok:
- raise HTTPException(status_code=response.status_code, detail=response.text)
- elif mode == "" or value == "" or amount == 0:
- raise HTTPException(status_code=500, detail="Parameter mode, value and amount must always be set")
- elif amount > config.get("api_limit"):
- raise HTTPException(status_code=500, detail=f"amount='{amount}' is to big")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/list.json?mode={mode}&value={value}&amount={amount}")
- info = response.json()
- response = None
- blocklist = list()
+ domainlist = list()
+ if response is not None and response.ok:
+ domainlist = response.json()
+ format = config.get("timestamp_format")
+ for row in domainlist:
+ row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(format)
+ row["last_updated"] = datetime.utcfromtimestamp(row["last_updated"]).strftime(format)
+
+ return templates.TemplateResponse("views/list.html", {
+ "request" : request,
+ "mode" : mode if response is not None else None,
+ "value" : value if response is not None else None,
+ "amount" : amount if response is not None else None,
+ "found" : len(domainlist),
+ "domainlist": domainlist,
+ "slogan" : config.get("slogan"),
+ "theme" : config.get("theme"),
+ })
- if mode == "block_level" and not blocks.is_valid_level(value):
+@router.get(config.get("base_url") + "/top")
+def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
+ if mode == "block_level" and not blocks.valid(value, "block_level"):
raise HTTPException(status_code=500, detail="Invalid block level provided")
elif mode in ["domain", "reverse"] and not utils.is_domain_wanted(value):
raise HTTPException(status_code=500, detail="Invalid or blocked domain specified")
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?mode={mode}&value={value}&amount={amount}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}")
- if response is not None:
+ found = 0
+ blocklist = list()
+ if response is not None and response.ok:
blocklist = response.json()
- found = 0
- for block_level in blocklist:
- for block in blocklist[block_level]:
- block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
- block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
- found = found + 1
+ format = config.get("timestamp_format")
+ for block_level in blocklist:
+ for block in blocklist[block_level]:
+ block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(format)
+ block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(format)
+ found = found + 1
return templates.TemplateResponse("views/top.html", {
"request" : request,
"amount" : amount if response is not None else None,
"found" : found,
"blocklist": blocklist,
- "info" : info,
+ "slogan" : config.get("slogan"),
"theme" : config.get("theme"),
})
if not utils.is_domain_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
- # Fetch domain data
- database.cursor.execute("SELECT * FROM instances WHERE domain = ? LIMIT 1", [domain])
- domain_data = database.cursor.fetchone()
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/domain.json?domain={domain}")
+
+ if not response.ok or response.status_code > 200 or response.text.strip() == "":
+ raise HTTPException(status_code=response.status_code, detail=response.reason)
+
+ domain_data = response.json()
# Format timestamps
format = config.get("timestamp_format")
# Generic
instance[key] = domain_data[key]
- print(domain_data.keys())
return templates.TemplateResponse("views/infos.html", {
"request" : request,
"domain" : domain,