import re
from fba import *
-router = fastapi.FastAPI(docs_url=fba.config["base_url"] + "/docs", redoc_url=fba.config["base_url"] + "/redoc")
+router = fastapi.FastAPI(docs_url=config.config["base_url"] + "/docs", redoc_url=config.config["base_url"] + "/redoc")
templates = Jinja2Templates(directory="templates")
-@router.get(fba.config["base_url"] + "/api/info.json", response_class=JSONResponse)
+@router.get(config.config["base_url"] + "/api/info.json", response_class=JSONResponse)
def info():
fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
known, indexed, blocks, errorous = fba.cursor.fetchone()
"indexed_instances" : indexed,
"blocks_recorded" : blocks,
"errorous_instances": errorous,
- "slogan" : fba.config["slogan"]
+ "slogan" : config.config["slogan"]
}
-@router.get(fba.config["base_url"] + "/api/top.json", response_class=JSONResponse)
+@router.get(config.config["base_url"] + "/api/top.json", response_class=JSONResponse)
def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, originator: int = None):
if blocked != None:
if blocked > 500:
return scoreboard
-@router.get(fba.config["base_url"] + "/api/index.json", response_class=JSONResponse)
+@router.get(config.config["base_url"] + "/api/index.json", response_class=JSONResponse)
def blocked(domain: str = None, reason: str = None, reverse: str = None):
if domain == None and reason == None and reverse == None:
raise HTTPException(status_code=400, detail="No filter specified")
return result
-@router.get(fba.config["base_url"] + "/api/mutual.json", response_class=JSONResponse)
+@router.get(config.config["base_url"] + "/api/mutual.json", response_class=JSONResponse)
def mutual(domains: list[str] = Query()):
"""Return 200 if federation is open between the two, 4xx otherwise"""
fba.cursor.execute(
# No known blocks
return JSONResponse(status_code=200, content={})
-@router.get(fba.config["base_url"] + "/scoreboard")
+@router.get(config.config["base_url"] + "/scoreboard")
def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, originator: int = None):
scores = None
if blockers != None and blockers > 0:
- res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top.json?blockers={blockers}")
+ res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?blockers={blockers}")
elif blocked != None and blocked > 0:
- res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top.json?blocked={blocked}")
+ res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?blocked={blocked}")
elif reference != None and reference > 0:
- res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top.json?reference={reference}")
+ res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?reference={reference}")
elif software != None and software > 0:
- res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top.json?software={software}")
+ res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?software={software}")
elif originator != None and originator > 0:
- res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top.json?originator={originator}")
+ res = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/top.json?originator={originator}")
else:
raise HTTPException(status_code=400, detail="No filter specified")
raise HTTPException(status_code=res.status_code, detail=res.text)
return templates.TemplateResponse("scoreboard.html", {
- "base_url" : fba.config["base_url"],
- "slogan" : fba.config["slogan"],
+ "base_url" : config.config["base_url"],
+ "slogan" : config.config["slogan"],
"request" : request,
"scoreboard": True,
"blockers" : blockers,
"scores" : res.json()
})
-@router.get(fba.config["base_url"] + "/")
+@router.get(config.config["base_url"] + "/")
def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
if domain == "" or reason == "" or reverse == "":
return fastapi.responses.RedirectResponse("/")
blocks = None
if domain == None and reason == None and reverse == None:
- info = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/info.json")
+ info = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/info.json")
if not info.ok:
raise HTTPException(status_code=info.status_code, detail=info.text)
info = info.json()
elif domain != None:
- blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/index.json?domain={domain}")
+ blocks = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/index.json?domain={domain}")
elif reason != None:
- blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/index.json?reason={reason}")
+ blocks = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/index.json?reason={reason}")
elif reverse != None:
- blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/index.json?reverse={reverse}")
+ blocks = requests.get(f"http://{config.config['host']}:{config.config['port']}{config.config['base_url']}/api/index.json?reverse={reverse}")
if blocks != None:
if not blocks.ok:
"info" : info
})
-@router.get(fba.config["base_url"] + "/rss")
+@router.get(config.config["base_url"] + "/rss")
def rss(request: Request, domain: str = None):
if domain != None:
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
"Content-Type": "routerlication/rss+xml"
})
-@router.get(fba.config["base_url"] + "/robots.txt", response_class=PlainTextResponse)
+@router.get(config.config["base_url"] + "/robots.txt", response_class=PlainTextResponse)
def robots(request: Request):
return templates.TemplateResponse("robots.txt", {
"request" : request,
- "base_url": fba.config["base_url"]
+ "base_url": config.config["base_url"]
})
if __name__ == "__main__":
- uvicorn.run("api:router", host=fba.config["host"], port=fba.config["port"], log_level=fba.config["log_level"])
+ uvicorn.run("api:router", host=config.config["host"], port=config.config["port"], log_level=config.config["log_level"])
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
-import json
import sys
import validators
from fba import *
-__all__ = ['boot', 'cache', 'fba', 'instances']
+__all__ = ['boot', 'cache', 'config', 'fba', 'instances']
--- /dev/null
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import json
+
+with open("config.json") as f:
+ config = json.loads(f.read())
import validators
from fba import cache
+from fba import config
from fba import instances
-with open("config.json") as f:
- config = json.loads(f.read())
-
# Don't check these, known trolls/flooders/testing/developing
blacklist = [
# Floods network with fake nodes as "research" project
# HTTP headers for non-API requests
headers = {
- "User-Agent": config["useragent"],
+ "User-Agent": config.config["useragent"],
}
# HTTP headers for API requests
api_headers = {
- "User-Agent": config["useragent"],
+ "User-Agent": config.config["useragent"],
"Content-Type": "application/json",
}
])
# Cleanup old entries
- # DEBUG: print(f"DEBUG: Purging old records (distance: {config['error_log_cleanup']})")
- cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config["error_log_cleanup"]])
+ # DEBUG: print(f"DEBUG: Purging old records (distance: {config.config['error_log_cleanup']})")
+ cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.config["error_log_cleanup"]])
except BaseException as e:
print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
if software == "misskey":
# DEBUG: print(f"DEBUG: domain='{domain}' is misskey, sending API POST request ...")
offset = 0
- step = config["misskey_offset"]
+ step = config.config["misskey_offset"]
# iterating through all "suspended" (follow-only in its terminology)
# instances page-by-page, since that troonware doesn't support
if len(fetched) == 0:
# DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
- elif len(fetched) != config["misskey_offset"]:
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config['misskey_offset']}'")
- offset = offset + (config["misskey_offset"] - len(fetched))
+ elif len(fetched) != config.config["misskey_offset"]:
+ # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.config['misskey_offset']}'")
+ offset = offset + (config.config["misskey_offset"] - len(fetched))
else:
# DEBUG: print("DEBUG: Raising offset by step:", step)
offset = offset + step
elif software == "lemmy":
# DEBUG: print(f"DEBUG: domain='{domain}' is Lemmy, fetching JSON ...")
try:
- res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code='{res.status_code}',data[]='{type(data)}'")
# DEBUG: print(f"DEBUG: domain='{domain}',mode='{mode}'")
while True:
try:
- res = reqto.get(f"https://{domain}/api/v1/server/{mode}?start={start}&count=100", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/api/v1/server/{mode}?start={start}&count=100", headers=headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code='{res.status_code}',data[]='{type(data)}'")
# DEBUG: print(f"DEBUG: Fetching get_peers_url='{get_peers_url}' from '{domain}' ...")
try:
- res = reqto.get(f"https://{domain}{get_peers_url}", headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}{get_peers_url}", headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
if not res.ok or res.status_code >= 400:
# DEBUG: print(f"DEBUG: Was not able to fetch '{get_peers_url}', trying alternative ...")
- res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/api/v3/site", headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
# DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
data = {}
try:
- res = reqto.post(f"https://{domain}{path}", data=parameter, headers={**api_headers, **extra_headers}, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.post(f"https://{domain}{path}", data=parameter, headers={**api_headers, **extra_headers}, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
try:
# DEBUG: print("DEBUG: Fetching request:", request)
- res = reqto.get(request, headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(request, headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
data = res.json()
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
data = {}
try:
- res = reqto.get(f"https://{domain}/.well-known/nodeinfo", headers=api_headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}/.well-known/nodeinfo", headers=api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
data = res.json()
# DEBUG: print("DEBUG: domain,res.ok,data[]:", domain, res.ok, type(data))
try:
# DEBUG: print(f"DEBUG: Fetching path='{path}' from '{domain}' ...")
- res = reqto.get(f"https://{domain}{path}", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
+ res = reqto.get(f"https://{domain}{path}", headers=headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
# DEBUG: print("DEBUG: domain,res.ok,res.status_code,res.text[]:", domain, res.ok, res.status_code, type(res.text))
if res.ok and res.status_code < 300 and len(res.text) > 0:
if truncated:
message = message + "(the list has been truncated to the first 20 entries)"
- botheaders = {**api_headers, **{"Authorization": "Bearer " + config["bot_token"]}}
+ botheaders = {**api_headers, **{"Authorization": "Bearer " + config.config["bot_token"]}}
req = reqto.post(
- f"{config['bot_instance']}/api/v1/statuses",
+ f"{config.config['bot_instance']}/api/v1/statuses",
data={
"status" : message,
- "visibility" : config['bot_visibility'],
+ "visibility" : config.config['bot_visibility'],
"content_type": "text/plain"
},
headers=botheaders,
try:
doc = bs4.BeautifulSoup(
- reqto.get(f"https://{domain}/about", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
+ reqto.get(f"https://{domain}/about", headers=headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).text,
"html.parser",
)
except BaseException as e:
try:
doc = bs4.BeautifulSoup(
- reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
+ reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).text,
"html.parser",
)
except BaseException as e:
}
offset = 0
- step = config["misskey_offset"]
+ step = config.config["misskey_offset"]
while True:
# iterating through all "suspended" (follow-only in its terminology)
# instances page-by-page, since that troonware doesn't support
if len(fetched) == 0:
# DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
- elif len(fetched) != config["misskey_offset"]:
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config['misskey_offset']}'")
- offset = offset + (config["misskey_offset"] - len(fetched))
+ elif len(fetched) != config.config["misskey_offset"]:
+ # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.config['misskey_offset']}'")
+ offset = offset + (config.config["misskey_offset"] - len(fetched))
else:
# DEBUG: print("DEBUG: Raising offset by step:", step)
offset = offset + step
if len(fetched) == 0:
# DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
break
- elif len(fetched) != config["misskey_offset"]:
- # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config['misskey_offset']}'")
- offset = offset + (config["misskey_offset"] - len(fetched))
+ elif len(fetched) != config.config["misskey_offset"]:
+ # DEBUG: print(f"DEBUG: Fetched '{len(fetched)}' row(s) but expected: '{config.config['misskey_offset']}'")
+ offset = offset + (config.config["misskey_offset"] - len(fetched))
else:
# DEBUG: print("DEBUG: Raising offset by step:", step)
offset = offset + step
boot.acquire_lock()
fba.cursor.execute(
- "SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial', 'bookwyrm', 'takahe') AND (last_blocked IS NULL OR last_blocked < ?) ORDER BY rowid DESC", [time.time() - fba.config["recheck_block"]]
+ "SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial', 'bookwyrm', 'takahe') AND (last_blocked IS NULL OR last_blocked < ?) ORDER BY rowid DESC", [time.time() - config.config["recheck_block"]]
)
rows = fba.cursor.fetchall()
# handling CSRF, I've saw at least one server requiring it to access the endpoint
# DEBUG: print("DEBUG: Fetching meta:", blocker)
meta = bs4.BeautifulSoup(
- reqto.get(f"https://{blocker}/", headers=fba.headers, timeout=(fba.config["connection_timeout"], fba.config["read_timeout"])).text,
+ reqto.get(f"https://{blocker}/", headers=fba.headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).text,
"html.parser",
)
try:
reqheaders = fba.api_headers
# DEBUG: print("DEBUG: Querying API domain_blocks:", blocker)
- blocks = reqto.get(f"https://{blocker}/api/v1/instance/domain_blocks", headers=reqheaders, timeout=(fba.config["connection_timeout"], fba.config["read_timeout"])).json()
+ blocks = reqto.get(f"https://{blocker}/api/v1/instance/domain_blocks", headers=reqheaders, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).json()
print(f"INFO: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}' ...")
for block in blocks:
print("INFO: blocker:", blocker)
try:
# Blocks
- federation = reqto.get(f"https://{blocker}{fba.get_peers_url}?filter=suspended", headers=fba.api_headers, timeout=(fba.config["connection_timeout"], fba.config["read_timeout"])).json()
+ federation = reqto.get(f"https://{blocker}{fba.get_peers_url}?filter=suspended", headers=fba.api_headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).json()
if (federation == None):
print("WARNING: No valid response:", blocker);
else:
print("WARNING: Unknown software:", blocker, software)
- if fba.config["bot_enabled"] and len(blockdict) > 0:
+ if config.config["bot_enabled"] and len(blockdict) > 0:
send_bot_post(blocker, blockdict)
blockdict = []
try:
doc = bs4.BeautifulSoup(
- reqto.get("https://meta.chaos.social/federation", headers=fba.headers, timeout=(fba.config["connection_timeout"], fba.config["read_timeout"])).text,
+ reqto.get("https://meta.chaos.social/federation", headers=fba.headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"])).text,
"html.parser",
)
# DEBUG: print(f"DEBUG: doc()={len(doc)}[]={type(doc)}")
domains = list()
try:
print(f"INFO: Fetch FBA-specific RSS feed='{feed}' ...")
- res = reqto.get(feed, headers=fba.headers, timeout=(fba.config["connection_timeout"], fba.config["read_timeout"]))
+ res = reqto.get(feed, headers=fba.headers, timeout=(config.config["connection_timeout"], config.config["read_timeout"]))
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code='{res.status_code}',res.text()={len(res.text)}")
if res.ok and res.status_code < 300 and len(res.text) > 0:
# Loop through some instances
fba.cursor.execute(
- "SELECT domain, origin, software, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial', 'bookwyrm', 'takahe', 'lemmy') AND (last_instance_fetch IS NULL OR last_instance_fetch < ?) ORDER BY rowid DESC", [time.time() - fba.config["recheck_instance"]]
+ "SELECT domain, origin, software, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial', 'bookwyrm', 'takahe', 'lemmy') AND (last_instance_fetch IS NULL OR last_instance_fetch < ?) ORDER BY rowid DESC", [time.time() - config.config["recheck_instance"]]
)
rows = fba.cursor.fetchall()