import re
from fba import *
-router = fastapi.FastAPI(docs_url=config.config["base_url"] + "/docs", redoc_url=config.config["base_url"] + "/redoc")
+router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
templates = Jinja2Templates(directory="templates")
-@router.get(config.config["base_url"] + "/api/info.json", response_class=JSONResponse)
+@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
def info():
fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
known, indexed, blocks, errorous = fba.cursor.fetchone()
"indexed_instances" : indexed,
"blocks_recorded" : blocks,
"errorous_instances": errorous,
- "slogan" : config.config["slogan"]
+ "slogan" : config.get("slogan")
}
-@router.get(config.config["base_url"] + "/api/top.json", response_class=JSONResponse)
+@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, originator: int = None):
if blocked != None:
if blocked > 500:
return scoreboard
-@router.get(config.config["base_url"] + "/api/index.json", response_class=JSONResponse)
+@router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
def blocked(domain: str = None, reason: str = None, reverse: str = None):
if domain == None and reason == None and reverse == None:
raise HTTPException(status_code=400, detail="No filter specified")
return result
-@router.get(config.config["base_url"] + "/api/mutual.json", response_class=JSONResponse)
+@router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
def mutual(domains: list[str] = Query()):
"""Return 200 if federation is open between the two, 4xx otherwise"""
fba.cursor.execute(
# No known blocks
return JSONResponse(status_code=200, content={})
-@router.get(config.config["base_url"] + "/scoreboard")
+@router.get(config.get("base_url") + "/scoreboard")
def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, originator: int = None):
scores = None
if blockers != None and blockers > 0:
- res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?blockers={blockers}")
+ res = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
elif blocked != None and blocked > 0:
- res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?blocked={blocked}")
+ res = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
elif reference != None and reference > 0:
- res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?reference={reference}")
+ res = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
elif software != None and software > 0:
- res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?software={software}")
+ res = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
elif originator != None and originator > 0:
- res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?originator={originator}")
+ res = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?originator={originator}")
else:
raise HTTPException(status_code=400, detail="No filter specified")
raise HTTPException(status_code=res.status_code, detail=res.text)
return templates.TemplateResponse("scoreboard.html", {
- "base_url" : config.config["base_url"],
- "slogan" : config.config["slogan"],
+ "base_url" : config.get("base_url"),
+ "slogan" : config.get("slogan"),
"request" : request,
"scoreboard": True,
"blockers" : blockers,
"scores" : res.json()
})
-@router.get(config.config["base_url"] + "/")
+@router.get(config.get("base_url") + "/")
def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
if domain == "" or reason == "" or reverse == "":
return fastapi.responses.RedirectResponse("/")
blocks = None
if domain == None and reason == None and reverse == None:
- info = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/info.json")
+ info = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
if not info.ok:
raise HTTPException(status_code=info.status_code, detail=info.text)
info = info.json()
elif domain != None:
- blocks = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/index.json?domain={domain}")
+ blocks = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
elif reason != None:
- blocks = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/index.json?reason={reason}")
+ blocks = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
elif reverse != None:
- blocks = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/index.json?reverse={reverse}")
+ blocks = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
if blocks != None:
if not blocks.ok:
"info" : info
})
-@router.get(config.config["base_url"] + "/rss")
+@router.get(config.get("base_url") + "/rss")
def rss(request: Request, domain: str = None):
if domain != None:
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
"Content-Type": "routerlication/rss+xml"
})
-@router.get(config.config["base_url"] + "/robots.txt", response_class=PlainTextResponse)
+@router.get(config.get("base_url") + "/robots.txt", response_class=PlainTextResponse)
def robots(request: Request):
return templates.TemplateResponse("robots.txt", {
"request" : request,
- "base_url": config.config["base_url"]
+ "base_url": config.get("base_url")
})
if __name__ == "__main__":
- uvicorn.run("api:router", host=config.config["host"], port=config.config["port"], log_level=config.config["log_level"])
+ uvicorn.run("api:router", host=config.get("host"), port=config.get("port"), log_level=config.get("log_level"))
with open("config.json") as f:
config = json.loads(f.read())
+
+def get(key: str) -> any:
+ # DEBUG: print(f"DEBUG: key[{type(key)}]={key} - CALLED!")
+ if type(key) != str:
+ raise ValueError(f"Parameter key[]='{type(key)}' is not 'str'")
+ elif key == "":
+ raise ValueError("Parameter 'key' cannot be empty")
+ elif not key in config:
+ raise KeyError(f"key='{key}' does not exist in config array")
+
+ # DEBUG: print(f"DEBUG: config[{key}]={config[key]} - EXIT!")
+ return config[key]
\ No newline at end of file
# HTTP headers for non-API requests
headers = {
- "User-Agent": config.config["useragent"],
+ "User-Agent": config.get("useragent"),
}
# HTTP headers for API requests
api_headers = {
- "User-Agent": config.config["useragent"],
+ "User-Agent": config.get("useragent"),
"Content-Type": "application/json",
}
])
# Cleanup old entries
- # DEBUG: print(f"DEBUG: Purging old records (distance: {config.config['error_log_cleanup']})")
- cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.config["error_log_cleanup"]])
+ # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
+ cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
except BaseException as e:
print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
if software == "misskey":
# DEBUG: print(f"DEBUG: domain='{domain}' is misskey, sending API POST request ...")
offset = 0
- step = config.config["misskey_offset"]
+ step = config.get("misskey_offset")
# iterating through all "suspended" (follow-only in its terminology)
# instances page-by-page, since that troonware doesn't support
if len(fetched) == 0:
# DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
- elif len(fetched) != config.config["misskey_offset"]:
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.config['misskey_offset']}'")
- offset = offset + (config.config["misskey_offset"] - len(fetched))
+ elif len(fetched) != config.get("misskey_offset"):
+ # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_offset')}'")
+ offset = offset + (config.get("misskey_offset") - len(fetched))
else:
# DEBUG: print("DEBUG: Raising offset by step:", step)
offset = offset + step
elif software == "lemmy":
# DEBUG: print(f"DEBUG: domain='{domain}' is Lemmy, fetching JSON ...")
try:
- res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code='{res.status_code}',data[]='{type(data)}'")
# DEBUG: print(f"DEBUG: domain='{domain}',mode='{mode}'")
while True:
try:
- res = reqto.get(f"https://{domain}/api/v1/server/{mode}?start={start}&count=100", headers=headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/api/v1/server/{mode}?start={start}&count=100", headers=headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code='{res.status_code}',data[]='{type(data)}'")
# DEBUG: print(f"DEBUG: Fetching get_peers_url='{get_peers_url}' from '{domain}' ...")
try:
- res = reqto.get(f"https://{domain}{get_peers_url}", headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
+ res = reqto.get(f"https://{domain}{get_peers_url}", headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
if not res.ok or res.status_code >= 400:
# DEBUG: print(f"DEBUG: Was not able to fetch '{get_peers_url}', trying alternative ...")
- res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
# DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
data = {}
try:
- res = reqto.post(f"https://{domain}{path}", data=parameter, headers={**api_headers, **extra_headers}, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
+ res = reqto.post(f"https://{domain}{path}", data=parameter, headers={**api_headers, **extra_headers}, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
try:
# DEBUG: print("DEBUG: Fetching request:", request)
- res = reqto.get(request, headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
+ res = reqto.get(request, headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
data = {}
try:
- res = reqto.get(f"https://{domain}/.well-known/nodeinfo", headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/.well-known/nodeinfo", headers=api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
data = res.json()
# DEBUG: print("DEBUG: domain,res.ok,data[]:", domain, res.ok, type(data))
try:
# DEBUG: print(f"DEBUG: Fetching path='{path}' from '{domain}' ...")
- res = reqto.get(f"https://{domain}{path}", headers=headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
+ res = reqto.get(f"https://{domain}{path}", headers=headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
# DEBUG: print("DEBUG: domain,res.ok,res.status_code,res.text[]:", domain, res.ok, res.status_code, type(res.text))
if res.ok and res.status_code < 300 and len(res.text) > 0:
if truncated:
message = message + "(the list has been truncated to the first 20 entries)"
- botheaders = {**api_headers, **{"Authorization": "Bearer " + config.config["bot_token"]}}
+ botheaders = {**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}}
req = reqto.post(
- f"{config.config['bot_instance']}/api/v1/statuses",
+ f"{config.get('bot_instance')}/api/v1/statuses",
data={
"status" : message,
- "visibility" : config.config['bot_visibility'],
+ "visibility" : config.get('bot_visibility'),
"content_type": "text/plain"
},
headers=botheaders,
try:
doc = bs4.BeautifulSoup(
- reqto.get(f"https://{domain}/about", headers=headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).text,
+ reqto.get(f"https://{domain}/about", headers=headers, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
except BaseException as e:
try:
doc = bs4.BeautifulSoup(
- reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).text,
+ reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
except BaseException as e:
}
offset = 0
- step = config.config["misskey_offset"]
+ step = config.get("misskey_offset")
while True:
# iterating through all "suspended" (follow-only in its terminology)
# instances page-by-page, since that troonware doesn't support
if len(fetched) == 0:
# DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
- elif len(fetched) != config.config["misskey_offset"]:
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.config['misskey_offset']}'")
- offset = offset + (config.config["misskey_offset"] - len(fetched))
+ elif len(fetched) != config.get("misskey_offset"):
+ # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_offset')}'")
+ offset = offset + (config.get("misskey_offset") - len(fetched))
else:
# DEBUG: print("DEBUG: Raising offset by step:", step)
offset = offset + step
if len(fetched) == 0:
# DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
- elif len(fetched) != config.config["misskey_offset"]:
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.config['misskey_offset']}'")
- offset = offset + (config.config["misskey_offset"] - len(fetched))
+ elif len(fetched) != config.get("misskey_offset"):
+ # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.get('misskey_offset')}'")
+ offset = offset + (config.get("misskey_offset") - len(fetched))
else:
# DEBUG: print("DEBUG: Raising offset by step:", step)
offset = offset + step
boot.acquire_lock()
fba.cursor.execute(
- "SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial', 'bookwyrm', 'takahe') AND (last_blocked IS NULL OR last_blocked < ?) ORDER BY rowid DESC", [time.time() - config.config["recheck_block"]]
+ "SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial', 'bookwyrm', 'takahe') AND (last_blocked IS NULL OR last_blocked < ?) ORDER BY rowid DESC", [time.time() - config.get("recheck_block")]
)
rows = fba.cursor.fetchall()
# handling CSRF, I've saw at least one server requiring it to access the endpoint
# DEBUG: print("DEBUG: Fetching meta:", blocker)
meta = bs4.BeautifulSoup(
- reqto.get(f"https://{blocker}/", headers=fba.headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).text,
+ reqto.get(f"https://{blocker}/", headers=fba.headers, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
try:
reqheaders = fba.api_headers
# DEBUG: print("DEBUG: Querying API domain_blocks:", blocker)
- blocks = reqto.get(f"https://{blocker}/api/v1/instance/domain_blocks", headers=reqheaders, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).json()
+ blocks = reqto.get(f"https://{blocker}/api/v1/instance/domain_blocks", headers=reqheaders, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).json()
print(f"INFO: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}' ...")
for block in blocks:
print("INFO: blocker:", blocker)
try:
# Blocks
- federation = reqto.get(f"https://{blocker}{fba.get_peers_url}?filter=suspended", headers=fba.api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).json()
+ federation = reqto.get(f"https://{blocker}{fba.get_peers_url}?filter=suspended", headers=fba.api_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).json()
if (federation == None):
print("WARNING: No valid response:", blocker);
else:
print("WARNING: Unknown software:", blocker, software)
- if config.config["bot_enabled"] and len(blockdict) > 0:
+ if config.get("bot_enabled") and len(blockdict) > 0:
send_bot_post(blocker, blockdict)
blockdict = []
try:
doc = bs4.BeautifulSoup(
- reqto.get("https://meta.chaos.social/federation", headers=fba.headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).text,
+ reqto.get("https://meta.chaos.social/federation", headers=fba.headers, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
# DEBUG: print(f"DEBUG: doc()={len(doc)}[]={type(doc)}")
domains = list()
try:
print(f"INFO: Fetch FBA-specific RSS feed='{feed}' ...")
- res = reqto.get(feed, headers=fba.headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
+ res = reqto.get(feed, headers=fba.headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")))
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code='{res.status_code}',res.text()={len(res.text)}")
if res.ok and res.status_code < 300 and len(res.text) > 0:
# Loop through some instances
fba.cursor.execute(
- "SELECT domain, origin, software, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial', 'bookwyrm', 'takahe', 'lemmy') AND (last_instance_fetch IS NULL OR last_instance_fetch < ?) ORDER BY rowid DESC", [time.time() - config.config["recheck_instance"]]
+ "SELECT domain, origin, software, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial', 'bookwyrm', 'takahe', 'lemmy') AND (last_instance_fetch IS NULL OR last_instance_fetch < ?) ORDER BY rowid DESC", [time.time() - config.get("recheck_instance")]
)
rows = fba.cursor.fetchall()