# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
+from datetime import datetime
+from email import utils
+
+import re
+
from fastapi import Request, HTTPException, Query
from fastapi.responses import JSONResponse
from fastapi.responses import PlainTextResponse
from fastapi.templating import Jinja2Templates
-from datetime import datetime
-from email import utils
import fastapi
import uvicorn
import requests
-import re
import validators
-from fba import *
+from fba import config
+from fba import fba
+from fba import network
router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
templates = Jinja2Templates(directory="templates")
@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
-def info():
- fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
+def api_info():
+ fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
known, indexed, blocklist, errorous = fba.cursor.fetchone()
return {
}
@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
-def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
- if blocked != None:
- if blocked > 500:
- raise HTTPException(status_code=400, detail="Too many results")
- fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
- elif blockers != None:
- if blockers > 500:
- raise HTTPException(status_code=400, detail="Too many results")
- fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
- elif reference != None:
- if reference > 500:
- raise HTTPException(status_code=400, detail="Too many results")
- fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
- elif software != None:
- if software > 500:
- raise HTTPException(status_code=400, detail="Too many results")
- fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
- elif originator != None:
- if originator > 500:
- raise HTTPException(status_code=400, detail="Too many results")
- fba.cursor.execute("SELECT originator, COUNT(domain) FROM instances WHERE originator IS NOT NULL GROUP BY originator ORDER BY COUNT(domain) DESC, originator ASC LIMIT ?", [originator])
- elif error_code != None:
- if error_code > 500:
- raise HTTPException(status_code=400, detail="Too many results")
- fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS cnt FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY cnt DESC LIMIT ?", [error_code])
+def api_top(mode: str, amount: int):
+ if amount > 500:
+ raise HTTPException(status_code=400, detail="Too many results")
+
+ if mode == "blocked":
+ fba.cursor.execute("SELECT blocked, COUNT(blocked) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY score DESC LIMIT ?", [amount])
+ elif mode == "blocker":
+ fba.cursor.execute("SELECT blocker, COUNT(blocker) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY score DESC LIMIT ?", [amount])
+ elif mode == "reference":
+ fba.cursor.execute("SELECT origin, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY score DESC LIMIT ?", [amount])
+ elif mode == "software":
+ fba.cursor.execute("SELECT software, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY score DESC, software ASC LIMIT ?", [amount])
+ elif mode == "command":
+ fba.cursor.execute("SELECT command, COUNT(domain) AS score FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY score DESC, command ASC LIMIT ?", [amount])
+ elif mode == "error_code":
+ fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS score FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY score DESC LIMIT ?", [amount])
else:
raise HTTPException(status_code=400, detail="No filter specified")
- scores = fba.cursor.fetchall()
-
- scoreboard = []
+ scores = list()
- for domain, highscore in scores:
- scoreboard.append({
- "domain" : domain,
- "highscore": highscore
+ for domain, score in fba.cursor.fetchall():
+ scores.append({
+ "domain": domain,
+ "score" : score
})
- return scoreboard
+ return scores
@router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
-def blocked(domain: str = None, reason: str = None, reverse: str = None):
- if domain == None and reason == None and reverse == None:
+def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
+ if domain is None and reason is None and reverse is None:
raise HTTPException(status_code=400, detail="No filter specified")
- if reason != None:
+ if reason is not None:
reason = re.sub("(%|_)", "", reason)
if len(reason) < 3:
raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
- if domain != None:
+ if domain is not None:
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode('idna').decode('utf-8')
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
(domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
- elif reverse != None:
+ elif reverse is not None:
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
else:
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
result = {}
for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
+ if reason is not None and reason != "":
+ reason = reason.replace(",", " ").replace(" ", " ")
+
entry = {
"blocker" : blocker,
"blocked" : blocked,
"first_seen": first_seen,
"last_seen" : last_seen
}
+
if block_level in result:
result[block_level].append(entry)
else:
return result
@router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
-def mutual(domains: list[str] = Query()):
+def api_mutual(domains: list[str] = Query()):
"""Return 200 if federation is open between the two, 4xx otherwise"""
fba.cursor.execute(
"SELECT block_level FROM blocks " \
return JSONResponse(status_code=200, content={})
@router.get(config.get("base_url") + "/scoreboard")
-def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
- if blockers != None and blockers > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
- elif blocked != None and blocked > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
- elif reference != None and reference > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
- elif software != None and software > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
- elif originator != None and originator > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?originator={originator}")
- elif error_code != None and error_code > 0:
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?error_code={error_code}")
+def scoreboard(request: Request, mode: str, amount: int):
+ response = None
+
+ if mode == "blocker" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocker&amount={amount}")
+ elif mode == "blocked" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocked&amount={amount}")
+ elif mode == "reference" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=reference&amount={amount}")
+ elif mode == "software" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=software&amount={amount}")
+ elif mode == "command" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=command&amount={amount}")
+ elif mode == "error_code" and amount > 0:
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=error_code&amount={amount}")
else:
raise HTTPException(status_code=400, detail="No filter specified")
- if response == None:
+ if response is None:
raise HTTPException(status_code=500, detail="Could not determine scores")
elif not response.ok:
raise HTTPException(status_code=response.status_code, detail=response.text)
"slogan" : config.get("slogan"),
"request" : request,
"scoreboard": True,
- "blockers" : blockers,
- "blocked" : blocked,
- "reference" : reference,
- "software" : software,
- "originator": originator,
- "error_code": error_code,
- "scores" : fba.json_from_response(response)
+ "mode" : mode,
+ "amount" : amount,
+ "scores" : network.json_from_response(response)
})
@router.get(config.get("base_url") + "/")
})
@router.get(config.get("base_url") + "/top")
-def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
+def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
if domain == "" or reason == "" or reverse == "":
raise HTTPException(status_code=500, detail="Insufficient parameter provided")
info = response.json()
response = None
- if domain != None:
+ if domain is not None:
if not validators.domain(domain):
raise HTTPException(status_code=500, detail="Invalid domain")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
- elif reason != None:
+ elif reason is not None:
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
- elif reverse != None:
+ elif reverse is not None:
if not validators.domain(reverse):
raise HTTPException(status_code=500, detail="Invalid domain")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
- if response != None:
+ if response is not None:
if not response.ok:
raise HTTPException(status_code=response.status_code, detail=response.text)
blocklist = response.json()
@router.get(config.get("base_url") + "/rss")
def rss(request: Request, domain: str = None):
- if domain != None:
+ if domain is not None:
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode('idna').decode('utf-8')
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
blocklist = []
for blocker, blocked, block_level, reason, first_seen, last_seen in result:
first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
- if reason == None or reason == '':
+ if reason is None or reason == '':
reason = "No reason provided."
else:
reason = "Provided reason: '" + reason + "'"
"blocked" : blocked,
"block_level": block_level,
"reason" : reason,
- "first_seen" : first_seen
+ "first_seen" : first_seen,
+ "last_seen" : last_seen,
})
return templates.TemplateResponse("rss.xml", {