from fba import fba
from fba import network
+from fba.helpers import tidyup
+
router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
templates = Jinja2Templates(directory="templates")
@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
def api_info():
- fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
- known, indexed, blocklist, errorous = fba.cursor.fetchone()
+ fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL)")
+ row = fba.cursor.fetchone()
return {
- "known_instances" : known,
- "indexed_instances" : indexed,
- "blocks_recorded" : blocklist,
- "errorous_instances": errorous,
+ "known_instances" : row[0],
+ "indexed_instances" : row[1],
+ "blocks_recorded" : row[2],
+ "errorous_instances": row[3],
"slogan" : config.get("slogan")
}
-@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
-def api_top(mode: str, amount: int):
+@router.get(config.get("base_url") + "/api/scoreboard.json", response_class=JSONResponse)
+def api_scoreboard(mode: str, amount: int):
if amount > 500:
raise HTTPException(status_code=400, detail="Too many results")
fba.cursor.execute("SELECT command, COUNT(domain) AS score FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY score DESC, command ASC LIMIT ?", [amount])
elif mode == "error_code":
fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS score FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY score DESC LIMIT ?", [amount])
+ elif mode == "avg_peers":
+ fba.cursor.execute("SELECT software, AVG(total_peers) AS sum FROM instances WHERE software IS NOT NULL GROUP BY software HAVING sum>0 ORDER BY sum DESC LIMIT ?", [amount])
else:
raise HTTPException(status_code=400, detail="No filter specified")
for domain, score in fba.cursor.fetchall():
scores.append({
"domain": domain,
- "score" : score
+ "score" : round(score)
})
return scores
raise HTTPException(status_code=400, detail="No filter specified")
if reason is not None:
- reason = re.sub("(%|_)", "", reason)
+ reason = re.sub("(%|_)", "", tidyup.reason(reason))
if len(reason) < 3:
raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
if domain is not None:
+ domain = tidyup.domain(domain)
+ if not validators.domain(domain.split("/")[0]):
+ raise HTTPException(status_code=500, detail="Invalid domain")
+
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode('idna').decode('utf-8')
+
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
(domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
elif reverse is not None:
+ reverse = tidyup.domain(reverse)
+ if not validators.domain(reverse):
+ raise HTTPException(status_code=500, detail="Invalid domain")
+
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
else:
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
response = None
if mode == "blocker" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocker&amount={amount}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=blocker&amount={amount}")
elif mode == "blocked" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocked&amount={amount}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=blocked&amount={amount}")
elif mode == "reference" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=reference&amount={amount}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=reference&amount={amount}")
elif mode == "software" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=software&amount={amount}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=software&amount={amount}")
elif mode == "command" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=command&amount={amount}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=command&amount={amount}")
elif mode == "error_code" and amount > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=error_code&amount={amount}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=error_code&amount={amount}")
+ elif mode == "avg_peers" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/scoreboard.json?mode=avg_peers&amount={amount}")
else:
raise HTTPException(status_code=400, detail="No filter specified")
response = None
if domain is not None:
- if not validators.domain(domain):
+ domain = tidyup.domain(domain)
+ if not validators.domain(domain.split("/")[0]):
raise HTTPException(status_code=500, detail="Invalid domain")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
elif reason is not None:
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
elif reverse is not None:
+ reverse = tidyup.domain(reverse)
if not validators.domain(reverse):
raise HTTPException(status_code=500, detail="Invalid domain")
if response is not None:
if not response.ok:
raise HTTPException(status_code=response.status_code, detail=response.text)
+
blocklist = response.json()
+
for block_level in blocklist:
for block in blocklist[block_level]:
block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
- block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
+ block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
return templates.TemplateResponse("views/top.html", {
"request": request,
@router.get(config.get("base_url") + "/rss")
def rss(request: Request, domain: str = None):
if domain is not None:
+ domain = tidyup.domain(domain)
+
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
- punycode = domain.encode('idna').decode('utf-8')
- fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
- (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
+ punycode = domain.encode("idna").decode("utf-8")
+
+ fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT ?", [
+ domain,
+ "*." + domain, wildchar,
+ fba.get_hash(domain),
+ punycode,
+ "*." + punycode,
+ config.get("rss_limit")
+ ])
else:
- fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT 50")
+ fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT ?", [config.get("rss_limit")])
result = fba.cursor.fetchall()
-
blocklist = []
- for blocker, blocked, block_level, reason, first_seen, last_seen in result:
- first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
- if reason is None or reason == '':
- reason = "No reason provided."
- else:
- reason = "Provided reason: '" + reason + "'"
+ for row in result:
blocklist.append({
- "blocker" : blocker,
- "blocked" : blocked,
- "block_level": block_level,
- "reason" : reason,
- "first_seen" : first_seen,
- "last_seen" : last_seen,
+ "blocker" : row[0],
+ "blocked" : row[1],
+ "block_level": row[2],
+ "reason" : "Provided reason: '" + row[3] + "'" if row[3] is not None and row[3] != "" else "No reason provided.",
+ "first_seen" : utils.format_datetime(datetime.fromtimestamp(row[4])),
+ "last_seen" : utils.format_datetime(datetime.fromtimestamp(row[5])),
})
return templates.TemplateResponse("rss.xml", {