templates = Jinja2Templates(directory="templates")
@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
-def api_info():
+def api_info() -> None:
database.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances) AS total_websites, (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube', 'takahe', 'gotosocial', 'brighteon', 'wildebeest', 'bookwyrm', 'mitra', 'areionskey', 'mammuthus', 'neodb', 'smithereen', 'vebinet', 'toki', 'snac', 'biblioreads', 'wordpress', 'oolong', 'diaspora')) AS supported_instances, (SELECT COUNT(blocker) FROM blocks) AS total_blocks, (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL) AS erroneous_instances")
row = database.cursor.fetchone()
@router.get(config.get("base_url") + "/api/scoreboard.json", response_class=JSONResponse)
-def api_scoreboard(mode: str, amount: int):
+def api_scoreboard(mode: str, amount: int) -> None:
if amount > config.get("api_limit"):
raise HTTPException(status_code=400, detail="Too many results")
return JSONResponse(status_code=200, content=scores)
@router.get(config.get("base_url") + "/api/list.json", response_class=JSONResponse)
-def api_list(request: Request, mode: str, value: str, amount: int):
+def api_list(request: Request, mode: str, value: str, amount: int) -> None:
if mode is None or value is None or amount is None:
raise HTTPException(status_code=500, detail="No filter specified")
elif amount > config.get("api_limit"):
return domainlist
@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
-def api_index(request: Request, mode: str, value: str, amount: int):
+def api_index(request: Request, mode: str, value: str, amount: int) -> None:
if mode is None or value is None or amount is None:
raise HTTPException(status_code=500, detail="No filter specified")
elif amount > config.get("api_limit"):
return result
@router.get(config.get("base_url") + "/api/domain.json", response_class=JSONResponse)
-def api_domain(domain: str):
+def api_domain(domain: str) -> None:
if domain is None:
raise HTTPException(status_code=400, detail="Invalid request, parameter 'domain' missing")
return JSONResponse(status_code=200, content=dict(domain_data))
@router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
-def api_mutual(domains: list[str] = Query()):
+def api_mutual(domains: list[str] = Query()) -> None:
"""Return 200 if federation is open between the two, 4xx otherwise"""
database.cursor.execute(
"SELECT block_level FROM blocks " \
return JSONResponse(status_code=200, content={})
@router.get(config.get("base_url") + "/.well-known/nodeinfo", response_class=JSONResponse)
-def wellknown_nodeinfo(request: Request):
+def wellknown_nodeinfo(request: Request) -> None:
return JSONResponse(status_code=200, content={
"links": ({
"rel" : "http://nodeinfo.diaspora.software/ns/schema/1.0",
})
@router.get(config.get("base_url") + "/nodeinfo/1.0", response_class=JSONResponse)
-def nodeinfo_1_0(request: Request):
+def nodeinfo_1_0(request: Request) -> None:
return JSONResponse(status_code=200, content={
"version": "1.0",
"software": {
})
@router.get(config.get("base_url") + "/api/v1/instance/domain_blocks", response_class=JSONResponse)
-def api_domain_blocks(request: Request):
+def api_domain_blocks(request: Request) -> None:
blocked = blacklist.get_all()
blocking = list()
return JSONResponse(status_code=200, content=blocking)
@router.get(config.get("base_url") + "/api/v1/instance/peers", response_class=JSONResponse)
-def api_peers(request: Request):
+def api_peers(request: Request) -> None:
database.cursor.execute("SELECT domain FROM instances WHERE nodeinfo_url IS NOT NULL")
peers = list()
return JSONResponse(status_code=200, content=peers)
@router.get(config.get("base_url") + "/scoreboard")
-def scoreboard(request: Request, mode: str, amount: int):
+def scoreboard(request: Request, mode: str, amount: int) -> None:
if mode == "":
raise HTTPException(status_code=400, detail="No mode specified")
elif amount <= 0:
})
@router.get(config.get("base_url") + "/list")
-def list_domains(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
+def list_domains(request: Request, mode: str, value: str, amount: int = config.get("api_limit")) -> None:
if mode == "detection_mode" and not instances.valid(value, "detection_mode"):
raise HTTPException(status_code=500, detail="Invalid detection mode provided")
})
@router.get(config.get("base_url") + "/top")
-def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
+def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")) -> None:
if mode == "block_level" and not blocks.valid(value, "block_level"):
raise HTTPException(status_code=500, detail="Invalid block level provided")
elif mode in ["domain", "reverse"] and not domain_helper.is_wanted(value):
})
@router.get(config.get("base_url") + "/infos")
-def infos(request: Request, domain: str):
+def infos(request: Request, domain: str) -> None:
if domain is None:
raise HTTPException(status_code=400, detail="Invalid request, parameter 'domain' missing")
})
@router.get(config.get("base_url") + "/rss")
-def rss(request: Request, domain: str = None):
+def rss(request: Request, domain: str = None) -> None:
if domain is not None:
domain = tidyup.domain(domain).encode("idna").decode("utf-8")
})
@router.get(config.get("base_url") + "/robots.txt", response_class=PlainTextResponse)
-def robots(request: Request):
+def robots(request: Request) -> None:
return templates.TemplateResponse("views/robots.txt", {
"request" : request,
"base_url": config.get("base_url")
})
@router.get(config.get("base_url") + "/")
-def index(request: Request):
+def index(request: Request) -> None:
# Get info
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
# Argument parser
_PARSER = None
-def init_parser():
+def init_parser() -> None:
logger.debug("CALLED!")
global _PARSER
logger.debug("EXIT!")
-def run_command():
+def run_command() -> None:
logger.debug("CALLED!")
args = _PARSER.parse_args()
logger.debug("status=%d - EXIT!", status)
return status
-def shutdown():
+def shutdown() -> None:
logger.debug("Closing database connection ...")
database.connection.close()
locking.release()
logger.debug("exists='%s' - EXIT!", exists)
return exists
-def set_all(key: str, rows: list, value: any):
+def set_all(key: str, rows: list, value: any) -> None:
logger.debug("key='%s',rows()=%d,value[]='%s' - CALLED!", key, len(rows), type(value))
if not isinstance(key, str):
logger.debug("EXIT!")
-def set_sub_key(key: str, sub: str, value: any):
+def set_sub_key(key: str, sub: str, value: any) -> None:
logger.debug("key='%s',sub='%s',value[]='%s' - CALLED!", key, sub, type(value))
if not isinstance(key, str):
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
-def raise_on(domain: str):
+def raise_on(domain: str) -> None:
logger.debug("domain='%s' - CALLED!", domain)
if not isinstance(domain, str):
lockfile = tempfile.gettempdir() + '/fba.lock'
LOCK = None
-def acquire():
+def acquire() -> None:
global LOCK
logger.debug("CALLED!")
logger.debug("EXIT!")
-def release():
+def release() -> None:
logger.debug("CALLED!")
if LOCK is not None:
logger.debug("Releasing lock ...")
logger.debug("added='%s' - EXIT!", added)
return added
-def csv_block(blocker: str, url: str, command: str):
+def csv_block(blocker: str, url: str, command: str) -> None:
logger.debug("blocker='%s',url='%s',command='%s' - CALLED!", blocker, url, command)
domain_helper.raise_on(blocker)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
-def fetch_instances(domain: str, origin: str, software: str, command: str, path: str = None):
+def fetch_instances(domain: str, origin: str, software: str, command: str, path: str = None) -> None:
global _DEPTH
logger.debug("domain='%s',origin='%s',software='%s',command='%s',path='%s',_DEPTH=%d - CALLED!", domain, origin, software, command, path, _DEPTH)
domain_helper.raise_on(domain)
logger.debug("Returning json_reply(%d)[]='%s' - EXIT!", len(json_reply), type(json_reply))
return json_reply
-def send_bot_post(domain: str, blocklist: list):
+def send_bot_post(domain: str, blocklist: list) -> None:
logger.debug("domain='%s',blocklist()=%d - CALLED!", domain, len(blocklist))
domain_helper.raise_on(domain)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
-def get_reason(blocker: str, blocked: str, block_level: str):
+def get_reason(blocker: str, blocked: str, block_level: str) -> str:
logger.debug("blocker='%s',blocked='%s',block_level='%s' - CALLED!", blocker, blocked, block_level)
domain_helper.raise_on(blocker)
domain_helper.raise_on(blocked)
logger.debug("row[reason]='%s' - EXIT!", row["reason"])
return row["reason"]
-def update_reason(reason: str, blocker: str, blocked: str, block_level: str):
+def update_reason(reason: str, blocker: str, blocked: str, block_level: str) -> None:
logger.debug("reason='%s',blocker='%s',blocked='%s',block_level='%s' - CALLED!", reason, blocker, blocked, block_level)
domain_helper.raise_on(blocker)
domain_helper.raise_on(blocked)
logger.debug("EXIT!")
-def update_last_seen(blocker: str, blocked: str, block_level: str):
+def update_last_seen(blocker: str, blocked: str, block_level: str) -> None:
logger.debug("blocker='%s',blocked='%s',block_level='%s' - CALLED!", blocker, blocked, block_level)
domain_helper.raise_on(blocker)
domain_helper.raise_on(blocked)
logger.debug("is_blocked='%s' - EXIT!", is_blocked)
return is_blocked
-def add(blocker: str, blocked: str, reason: str, block_level: str):
+def add(blocker: str, blocked: str, reason: str, block_level: str) -> None:
logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
domain_helper.raise_on(blocker)
domain_helper.raise_on(blocked)
logger.debug("is_valid='%s' - EXIT!", is_valid)
return is_valid
-def translate_idnas(rows: list, column: str):
+def translate_idnas(rows: list, column: str) -> None:
logger.debug("rows[]='%s',column='%s' - CALLED!", type(rows), column)
if not isinstance(rows, list):
logger.debug("block_level='%s' - EXIT!", block_level)
return block_level
-def delete(domain: str):
+def delete(domain: str) -> None:
logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
-def add(domain: str, error: dict):
+def add(domain: str, error: dict) -> None:
logger.debug("domain='%s',error[]='%s' - CALLED!", domain, type(error))
domain_helper.raise_on(domain)
"software" : {},
}
-def _set_data(key: str, domain: str, value: any):
+def _set_data(key: str, domain: str, value: any) -> None:
logger.debug("key='%s',domain='%s',value[]='%s' - CALLED!", key, domain, type(value))
domain_helper.raise_on(domain)
logger.debug("has='%s' - EXIT!", has)
return has
-def update(domain: str):
+def update(domain: str) -> None:
logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logger.debug("EXIT!")
-def add(domain: str, origin: str, command: str, path: str = None, software: str = None):
+def add(domain: str, origin: str, command: str, path: str = None, software: str = None) -> None:
logger.debug("domain='%s',origin='%s',command='%s',path='%s',software='%s' - CALLED!", domain, origin, command, path, software)
domain_helper.raise_on(domain)
logger.debug("EXIT!")
-def set_last_nodeinfo(domain: str):
+def set_last_nodeinfo(domain: str) -> None:
logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logger.debug("EXIT!")
-def set_last_error(domain: str, error: dict):
+def set_last_error(domain: str, error: dict) -> None:
logger.debug("domain='%s',error[]='%s' - CALLED!", domain, type(error))
domain_helper.raise_on(domain)
logger.debug("EXIT!")
-def set_success(domain: str):
+def set_success(domain: str) -> None:
logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logger.debug("row[]='%s' - EXIT!", type(row))
return row
-def set_last_blocked(domain: str):
+def set_last_blocked(domain: str) -> None:
logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
_set_data("last_blocked", domain, time.time())
logger.debug("EXIT!")
-def set_last_instance_fetch(domain: str):
+def set_last_instance_fetch(domain: str) -> None:
logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
_set_data("last_instance_fetch", domain, time.time())
logger.debug("EXIT!")
-def set_last_response_time(domain: str, response_time: float):
+def set_last_response_time(domain: str, response_time: float) -> None:
logger.debug("domain='%s',response_time=%d - CALLED!", domain, response_time)
domain_helper.raise_on(domain)
_set_data("last_response_time", domain, response_time)
logger.debug("EXIT!")
-def set_total_peers(domain: str, peers: list):
+def set_total_peers(domain: str, peers: list) -> None:
logger.debug("domain='%s',peers()=%d - CALLED!", domain, len(peers))
domain_helper.raise_on(domain)
_set_data("total_peers", domain, len(peers))
logger.debug("EXIT!")
-def set_total_blocks(domain: str, blocks: list):
+def set_total_blocks(domain: str, blocks: list) -> None:
logger.debug("domain='%s',blocks()=%d - CALLED!", domain, len(blocks))
domain_helper.raise_on(domain)
_set_data("total_blocks", domain, len(blocks))
logger.debug("EXIT!")
-def set_obfuscated_blocks(domain: str, obfuscated: int):
+def set_obfuscated_blocks(domain: str, obfuscated: int) -> None:
logger.debug("domain='%s',obfuscated=%d - CALLED!", domain, obfuscated)
domain_helper.raise_on(domain)
_set_data("obfuscated_blocks", domain, obfuscated)
logger.debug("EXIT!")
-def set_nodeinfo_url(domain: str, url: str):
+def set_nodeinfo_url(domain: str, url: str) -> None:
logger.debug("domain='%s',url='%s' - CALLED!", domain, url)
domain_helper.raise_on(domain)
_set_data("nodeinfo_url", domain, url)
logger.debug("EXIT!")
-def set_detection_mode(domain: str, mode: str):
+def set_detection_mode(domain: str, mode: str) -> None:
logger.debug("domain='%s',mode='%s' - CALLED!", domain, mode)
domain_helper.raise_on(domain)
_set_data("detection_mode", domain, mode)
logger.debug("EXIT!")
-def set_has_obfuscation(domain: str, status: bool):
+def set_has_obfuscation(domain: str, status: bool) -> None:
logger.debug("domain='%s',status='%s' - CALLED!", domain, status)
domain_helper.raise_on(domain)
_set_data("has_obfuscation", domain, status)
logger.debug("EXIT!")
-def set_original_software(domain: str, software: str):
+def set_original_software(domain: str, software: str) -> None:
logger.debug("domain='%s',software='%s' - CALLED!", domain, software)
domain_helper.raise_on(domain)
_set_data("original_software", domain, software)
logger.debug("EXIT!")
-def set_software(domain: str, software: str):
+def set_software(domain: str, software: str) -> None:
logger.debug("domain='%s',software='%s' - CALLED!", domain, software)
domain_helper.raise_on(domain)
logger.debug("is_valid='%s' - EXIT!", is_valid)
return is_valid
-def delete(domain: str):
+def delete(domain: str) -> None:
logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logger.debug("EXIT!")
-def translate_idnas(rows: list, column: str):
+def translate_idnas(rows: list, column: str) -> None:
logger.debug("rows[]='%s' - CALLED!", type(rows))
if not isinstance(rows, list):
logger.debug("recent='%s' - EXIT!", recent)
return recent
-def update (source_domain: str):
+def update (source_domain: str) -> None:
logger.debug("source_domain='%s' - CALLED!", source_domain)
domain_helper.raise_on(source_domain)