__pycache__/
venv/
config.json
+*.cover
+*.pyc
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
+from datetime import datetime
+from email import utils
+
+import re
+
from fastapi import Request, HTTPException, Query
from fastapi.responses import JSONResponse
from fastapi.responses import PlainTextResponse
-from fastapi.templating import Jinja2Templates
-from datetime import datetime
-from email import utils
import fastapi
import uvicorn
import requests
-import re
import validators
from fba import config
from fba import fba
router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
-templates = Jinja2Templates(directory="templates")
+templates = fastapi.templating.Jinja2Templates(directory="templates")
@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
-def info():
+def api_info():
fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
known, indexed, blocklist, errorous = fba.cursor.fetchone()
}
@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
-def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
+def api_top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, command: int = None, error_code: int = None):
if blocked is not None:
if blocked > 500:
raise HTTPException(status_code=400, detail="Too many results")
scores = fba.cursor.fetchall()
- scoreboard = []
+ scores = []
for domain, highscore in scores:
- scoreboard.append({
+ scores.append({
"domain" : domain,
"highscore": highscore
})
- return scoreboard
+ return scores
@router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
-def blocked(domain: str = None, reason: str = None, reverse: str = None):
+def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
if domain is None and reason is None and reverse is None:
raise HTTPException(status_code=400, detail="No filter specified")
return result
@router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
-def mutual(domains: list[str] = Query()):
+def api_mutual(domains: list[str] = Query()):
"""Return 200 if federation is open between the two, 4xx otherwise"""
fba.cursor.execute(
"SELECT block_level FROM blocks " \
"recheck_instance" : 172800,
"recheck_block" : 43200,
"misskey_limit" : 100,
- "error_log_cleanup" : 604800
+ "error_log_cleanup" : 604800,
+ "write_error_log" : "true"
}
'cache',
'commands',
'config',
+ 'csrf',
'federation',
'fba',
'instances',
# DEBUG: print(f"DEBUG: Did not update any rows: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',reason='{reason}' - EXIT!")
return
- except BaseException as exception:
- print(f"ERROR: failed SQL query: reason='{reason}',blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: failed SQL query: reason='{reason}',blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print("DEBUG: EXIT!")
# DEBUG: print(f"DEBUG: Did not update any rows: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}' - EXIT!")
return
- except BaseException as exception:
- print(f"ERROR: failed SQL query: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: failed SQL query: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print("DEBUG: EXIT!")
time.time()
),
)
- except BaseException as exception:
- print(f"ERROR: failed SQL query: blocker='{blocker}',blocked='{blocked}',reason='{reason}',block_level='{block_level}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: failed SQL query: blocker='{blocker}',blocked='{blocked}',reason='{reason}',block_level='{block_level}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print("DEBUG: EXIT!")
def acquire_lock():
global LOCK
try:
- print(f"DEBUG: Acquiring lock: '{lockfile}'")
+ # DEBUG: print(f"DEBUG: Acquiring lock: '{lockfile}'")
LOCK = zc.lockfile.LockFile(lockfile)
- print("DEBUG: Lock obtained.")
+ # DEBUG: print("DEBUG: Lock obtained.")
except zc.lockfile.LockError:
print(f"ERROR: Cannot aquire lock: '{lockfile}'")
sys.exit(100)
def init_parser():
- # DEBUG: print("DEBUG: init_parser(): CALLED!")
+ # DEBUG: # DEBUG: print("DEBUG: init_parser(): CALLED!")
global _PARSER
- # DEBUG: print("DEBUG: Initializing parser ...")
+ # DEBUG: # DEBUG: print("DEBUG: Initializing parser ...")
_PARSER = argparse.ArgumentParser(
description="Fetches block reasons from the fediverse",
epilog="Please note that some commands have optional arguments, you may want to try fba.py <command> --help to find them out.",
parser.add_argument("--single", action="store_true", help="Only fetch given instance.")
parser.set_defaults(command=commands.fetch_instances)
- # DEBUG: print("DEBUG: init_parser(): EXIT!")
+ # DEBUG: # DEBUG: print("DEBUG: init_parser(): EXIT!")
def run_command():
- # DEBUG: print("DEBUG: run_command(): CALLED!")
+ # DEBUG: # DEBUG: print("DEBUG: run_command(): CALLED!")
args = _PARSER.parse_args()
- # DEBUG: print(f"DEBUG: args[{type(args)}]={args}")
+ # DEBUG: # DEBUG: print(f"DEBUG: args[{type(args)}]={args}")
status = args.command(args)
- # DEBUG: print("DEBUG: status={status} - EXIT!")
+ # DEBUG: # DEBUG: print("DEBUG: status={status} - EXIT!")
return status if isinstance(status, int) else 0
def shutdown():
- print("DEBUG: Closing database connection ...")
+ # DEBUG: print("DEBUG: Closing database connection ...")
fba.connection.close()
if LOCK is not None:
- print("DEBUG: Releasing lock ...")
+ # DEBUG: print("DEBUG: Releasing lock ...")
LOCK.close()
- print(f"DEBUG: Deleting lockfile='{lockfile}' ...")
+ # DEBUG: print(f"DEBUG: Deleting lockfile='{lockfile}' ...")
os.remove(lockfile)
- print("DEBUG: Shutdown completed.")
+ # DEBUG: print("DEBUG: Shutdown completed.")
# DEBUG: print(f"DEBUG: Adding domain='{entry['domain']}' ...")
domains.append(entry["domain"])
- except BaseException as exception:
- print(f"ERROR: Cannot fetch graphql,exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: Cannot fetch graphql,exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
# DEBUG: print("DEBUG: Committing changes ...")
fba.connection.commit()
- except BaseException as exception:
- print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: blocker='{blocker}',software='{software}',exc[{type(exc)}]:'{str(exc)}'")
else:
print("WARNING: Unknown software:", blocker, software)
# DEBUG: print(f"DEBUG: blocked[]={type(blocked)}")
domains["reject"] = domains["reject"] + fba.find_domains(blocked)
- except BaseException as exception:
- print(f"ERROR: Cannot fetch from meta.chaos.social,exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: Cannot fetch from meta.chaos.social,exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
if response.ok and response.status_code < 300 and len(response.text) > 0:
- # DEBUG: print(f"DEBUG: Parsing RSS feed ...")
+ # DEBUG: print(f"DEBUG: Parsing RSS feed ({len(response.text)} Bytes) ...")
rss = atoma.parse_rss_bytes(response.content)
# DEBUG: print(f"DEBUG: rss[]={type(rss)}")
# DEBUG: print(f"DEBUG: Adding domain='{domain}'")
domains.append(domain)
- except BaseException as exception:
- print(f"ERROR: Cannot fetch args.feed='{args.feed}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: Cannot fetch args.feed='{args.feed}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
if response.ok and response.status_code < 300 and len(response.text) > 0:
- # DEBUG: print(f"DEBUG: Parsing ATOM feed ...")
+ # DEBUG: print(f"DEBUG: Parsing ATOM feed ({len(response.text)} Bytes) ...")
atom = atoma.parse_atom_bytes(response.content)
# DEBUG: print(f"DEBUG: atom[]={type(atom)}")
# DEBUG: print(f"DEBUG: Adding domain='{domain}',domains()={len(domains)}")
domains.append(domain)
- except BaseException as exception:
- print(f"ERROR: Cannot fetch feed='{feed}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: Cannot fetch feed='{feed}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print(f"DEBUG: domains({len(domains)})={domains}")
fba.fetch_instances(args.domain, None, None, inspect.currentframe().f_code.co_name)
if args.single:
- # DEBUG: print(f"DEBUG: Not fetching more instances - EXIT!")
+ # DEBUG: print("DEBUG: Not fetching more instances - EXIT!")
return
# Loop through some instances
rows = fba.cursor.fetchall()
print(f"INFO: Checking {len(rows)} entries ...")
for row in rows:
- # DEBUG: print("DEBUG: domain:", row[0])
+ # DEBUG: print(f"DEBUG: domain='{row[0]}'")
if blacklist.is_blacklisted(row[0]):
print("WARNING: domain is blacklisted:", row[0])
continue
# DEBUG: print(f"DEBUG: response[]='{type(response)}'")
if response.ok and response.content != "":
# DEBUG: print(f"DEBUG: Fetched {len(response.content)} Bytes, parsing CSV ...")
- #print(f"DEBUG: response.content={response.content}")
+ ## DEBUG: print(f"DEBUG: response.content={response.content}")
reader = csv.DictReader(response.content.decode('utf-8').splitlines(), dialect='unix')
#, fieldnames='domain,severity,reject_media,reject_reports,public_comment,obfuscate'
# DEBUG: print(f"DEBUG: reader[]={type(reader)}")
import json
with open("config.json") as f:
+ # DEBUG: print("DEBUG: Loading configuration file ...")
_config = json.loads(f.read())
def get(key: str) -> any:
--- /dev/null
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import bs4
+import reqto
+
+from fba import config
+#from fba import instances
+from fba import network
+
+def determine(domain: str, headers: dict) -> dict:
+ # DEBUG: print(f"DEBUG: domain='{domain}',headers()={len(headers)} - CALLED!")
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif not isinstance(headers, dict):
+ raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
+
+ # Default headers with no CSRF
+ reqheaders = headers
+
+ try:
+ # Fetch / to check for meta tag indicating csrf
+ # DEBUG: print(f"DEBUG: Fetching / from domain='{domain}' for CSRF check ...")
+ response = reqto.get(
+ f"https://{domain}/",
+ headers=network.web_headers,
+ timeout=(config.get("connection_timeout"), config.get("read_timeout"))
+ )
+
+ # DEBUG: print(f"DEBUG: response.ok='{response.ok}',response.status_code={response.status_code},response.text()={len(response.text)}")
+ if response.ok and len(response.text) > 0:
+ meta = bs4.BeautifulSoup(
+ response.text,
+ "html.parser"
+ )
+ # DEBUG: print(f"DEBUG: meta[]='{type(meta)}'")
+
+ tag = meta.find("meta", attrs={"name": "csrf-token"})
+ # DEBUG: print(f"DEBUG: tag={tag}")
+
+ csrf = tag["content"]
+ # DEBUG: print(f"DEBUG: Adding CSRF token='{csrf}' for domain='{domain}'")
+
+ reqheaders = {**headers, **{"X-CSRF-Token": csrf}}
+
+ except BaseException as exc:
+ # DEBUG: print(f"DEBUG: No CSRF token found, using normal headers: domain='{domain}',exc[{type(exc)}]={exc}")
+ pass
+
+ # DEBUG: print(f"DEBUG: reqheaders()={len(reqheaders)} - EXIT!")
+ return reqheaders
import sys
import time
+from urllib.parse import urlparse
+
import bs4
import requests
import validators
-from urllib.parse import urlparse
-
from fba import blacklist
from fba import config
from fba import instances
elif not isinstance(origin, str) and origin is not None:
raise ValueError(f"Parameter origin[]={type(origin)} is not 'str'")
elif software is None:
- print(f"DEBUG: software for domain='{domain}' is not set, determining ...")
+ # DEBUG: print(f"DEBUG: software for domain='{domain}' is not set, determining ...")
software = determine_software(domain, path)
- print(f"DEBUG: Determined software='{software}' for domain='{domain}'")
+ # DEBUG: print(f"DEBUG: Determined software='{software}' for domain='{domain}'")
elif not isinstance(software, str):
raise ValueError(f"Parameter software[]={type(software)} is not 'str'")
elif not isinstance(script, str):
# DEBUG: print("DEBUG: Fetching instances for domain:", domain, software)
peerlist = fetch_peers(domain, software)
- if (peerlist is None):
+ if peerlist is None:
print("ERROR: Cannot fetch peers:", domain)
return
elif instances.has_pending_instance_data(domain):
if not instances.is_registered(instance):
# DEBUG: print("DEBUG: Adding new instance:", instance, domain)
instances.add(instance, domain, script)
- except BaseException as exception:
- print(f"ERROR: instance='{instance}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: instance='{instance}',exc[{type(exc)}]:'{str(exc)}'")
continue
# DEBUG: print("DEBUG: EXIT!")
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
+ elif config.get("write_error_log").lower() != "true":
+ # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
+ return
try:
# DEBUG: print("DEBUG: BEFORE response[]:", type(response))
# Cleanup old entries
# DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
- except BaseException as exception:
- print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: failed SQL query: domain='{domain}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print("DEBUG: EXIT!")
peers = list()
try:
response = network.fetch_response(domain, "/api/v1/instance/peers", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
data = json_from_response(response)
-
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+
if not response.ok or response.status_code >= 400:
- # DEBUG: print(f"DEBUG: Was not able to fetch peers, trying alternative ...")
+ # DEBUG: print("DEBUG: Was not able to fetch peers, trying alternative ...")
response = network.fetch_response(domain, "/api/v3/site", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
# DEBUG: print("DEBUG: Querying API was successful:", domain, len(data))
peers = data
- except BaseException as exception:
- print("WARNING: Some error during get():", domain, exception)
- instances.update_last_error(domain, exception)
+ except BaseException as exc:
+ print("WARNING: Some error during fetch_peers():", domain, exc)
+ instances.update_last_error(domain, exc)
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instances.set("total_peers", domain, len(peers))
+ instances.set_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
instances.update_last_instance_fetch(domain)
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
if response.ok and isinstance(data, dict):
# DEBUG: print("DEBUG: Success:", request)
- instances.set("detection_mode", domain, "STATIC_CHECK")
- instances.set("nodeinfo_url" , domain, request)
+ instances.set_data("detection_mode", domain, "STATIC_CHECK")
+ instances.set_data("nodeinfo_url" , domain, request)
break
elif response.ok and isinstance(data, list):
print(f"UNSUPPORTED: domain='{domain}' returned a list: '{data}'")
instances.update_last_error(domain, response)
continue
- except BaseException as exception:
+ except BaseException as exc:
# DEBUG: print("DEBUG: Cannot fetch API request:", request)
- instances.update_last_error(domain, exception)
+ instances.update_last_error(domain, exc)
pass
# DEBUG: print(f"DEBUG: data()={len(data)} - EXIT!")
# DEBUG: print("DEBUG: href,response.ok,response.status_code:", link["href"], response.ok, response.status_code)
if response.ok and isinstance(data, dict):
# DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data))
- instances.set("detection_mode", domain, "AUTO_DISCOVERY")
- instances.set("nodeinfo_url" , domain, link["href"])
+ instances.set_data("detection_mode", domain, "AUTO_DISCOVERY")
+ instances.set_data("nodeinfo_url" , domain, link["href"])
break
else:
print("WARNING: Unknown 'rel' value:", domain, link["rel"])
else:
print("WARNING: nodeinfo does not contain 'links':", domain)
- except BaseException as exception:
+ except BaseException as exc:
print("WARNING: Failed fetching .well-known info:", domain)
- instances.update_last_error(domain, exception)
+ instances.update_last_error(domain, exc)
pass
# DEBUG: print("DEBUG: Returning data[]:", type(data))
# DEBUG: print("DEBUG: Found generator meta tag:", domain)
software = tidyup_domain(generator.get("content"))
print(f"INFO: domain='{domain}' is generated by '{software}'")
- instances.set("detection_mode", domain, "GENERATOR")
+ instances.set_data("detection_mode", domain, "GENERATOR")
remove_pending_error(domain)
elif isinstance(site_name, bs4.element.Tag):
# DEBUG: print("DEBUG: Found property=og:site_name:", domain)
sofware = tidyup_domain(site_name.get("content"))
print(f"INFO: domain='{domain}' has og:site_name='{software}'")
- instances.set("detection_mode", domain, "SITE_NAME")
+ instances.set_data("detection_mode", domain, "SITE_NAME")
remove_pending_error(domain)
- except BaseException as exception:
- # DEBUG: print(f"DEBUG: Cannot fetch / from '{domain}':", exception)
- instances.update_last_error(domain, exception)
+ except BaseException as exc:
+ # DEBUG: print(f"DEBUG: Cannot fetch / from '{domain}':", exc)
+ instances.update_last_error(domain, exc)
pass
# DEBUG: print(f"DEBUG: software[]={type(software)}")
software = "misskey"
elif software.find("/") > 0:
print("WARNING: Spliting of slash:", software)
- software = tidyup_domain(software.split("/")[-1]);
+ software = tidyup_domain(software.split("/")[-1])
elif software.find("|") > 0:
print("WARNING: Spliting of pipe:", software)
- software = tidyup_domain(software.split("|")[0]);
+ software = tidyup_domain(software.split("|")[0])
elif "powered by" in software:
# DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it")
software = strip_powered_by(software)
print(f"WARNING: domain='{domain}' is blacklisted - skipped!")
continue
elif domain == "gab.com/.ai, develop.gab.com":
- # DEBUG: print(f"DEBUG: Multiple domains detected in one row")
+ # DEBUG: print("DEBUG: Multiple domains detected in one row")
domains.append({
"domain": "gab.com",
"reason": reason,
print("WARNING: JSON response does not contain 'federated_instances':", domain)
instances.update_last_error(domain, response)
- except BaseException as exception:
- print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"WARNING: Exception during fetching JSON: domain='{domain}',exc[{type(exc)}]:'{str(exc)}'")
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instances.set("total_peers", domain, len(peers))
+ instances.set_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
instances.update_last_instance_fetch(domain)
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
-import bs4
import inspect
+
+import bs4
import validators
from fba import blacklist
network.fetch_response(domain, "/about/more", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
- except BaseException as exception:
- print("ERROR: Cannot fetch from domain:", domain, exception)
- instances.update_last_error(domain, exception)
+ except BaseException as exc:
+ print("ERROR: Cannot fetch from domain:", domain, exc)
+ instances.update_last_error(domain, exc)
return {}
for header in doc.find_all("h3"):
"report_removal": [],
}
- # handling CSRF, I've saw at least one server requiring it to access the endpoint
- # DEBUG: print("DEBUG: Fetching meta:", domain)
- meta = bs4.BeautifulSoup(
- network.fetch_response(domain, "/", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
- "html.parser",
- )
- try:
- csrf = meta.find("meta", attrs={"name": "csrf-token"})["content"]
- # DEBUG: print("DEBUG: Adding CSRF token:", domain, csrf)
- reqheaders = {**network.api_headers, **{"X-CSRF-Token": csrf}}
- except BaseException as exception:
- # DEBUG: print("DEBUG: No CSRF token found, using normal headers:", domain, exception)
- reqheaders = network.api_headers
-
# DEBUG: print("DEBUG: Querying API domain_blocks:", domain)
- blocklist = network.fetch_response(domain, "/api/v1/instance/domain_blocks", reqheaders, (config.get("connection_timeout"), config.get("read_timeout"))).json()
+ response = network.fetch_response(domain, "/api/v1/instance/domain_blocks", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+
+ # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
+ blocklist = fba.json_from_response(response)
print(f"INFO: Checking {len(blocklist)} entries from domain='{domain}',software='mastodon' ...")
for block in blocklist:
else:
print("WARNING: Unknown severity:", block['severity'], block['domain'])
- except BaseException as exception:
- # DEBUG: print(f"DEBUG: Failed, trying mastodon-specific fetches: domain='{domain}',exception[{type(exception)}]={str(exception)}")
+ except BaseException as exc:
+ # DEBUG: print(f"DEBUG: Failed, trying mastodon-specific fetches: domain='{domain}',exc[{type(exc)}]={str(exc)}")
rows = fetch_blocks_from_about(domain)
print(f"INFO: Checking {len(rows.items())} entries from domain='{domain}',software='mastodon' ...")
# DEBUG: print("DEBUG: Committing changes ...")
fba.connection.commit()
- except BaseException as exception:
- print(f"ERROR: domain='{domain}',software='mastodon',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: domain='{domain}',software='mastodon',exc[{type(exc)}]:'{str(exc)}'")
# DEBUG: print("DEBUG: EXIT!")
break
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instances.set("total_peers", domain, len(peers))
+ instances.set_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
instances.update_last_instance_fetch(domain)
# DEBUG: print(f"DEBUG: count={count}")
if count == 0:
- # DEBUG: print(f"DEBUG: API is no more returning new instances, aborting loop!")
+ # DEBUG: print("DEBUG: API is no more returning new instances, aborting loop!")
break
- except BaseException as exception:
- print("WARNING: Caught error, exiting loop:", domain, exception)
- instances.update_last_error(domain, exception)
+ except BaseException as exc:
+ print("WARNING: Caught error, exiting loop:", domain, exc)
+ instances.update_last_error(domain, exc)
offset = 0
break
# DEBUG: print(f"DEBUG: count={count}")
if count == 0:
- # DEBUG: print(f"DEBUG: API is no more returning new instances, aborting loop!")
+ # DEBUG: print("DEBUG: API is no more returning new instances, aborting loop!")
break
- except BaseException as exception:
- print("ERROR: Exception during POST:", domain, exception)
- instances.update_last_error(domain, exception)
+ except BaseException as exc:
+ print("ERROR: Exception during POST:", domain, exc)
+ instances.update_last_error(domain, exc)
offset = 0
break
# DEBUG: print(f"DEBUG: domain='{domain}',mode='{mode}'")
while True:
try:
- response = network.fetch_response(domain, "/api/v1/server/{mode}?start={start}&count=100", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = network.fetch_response(domain, "/api/v1/server/{mode}?start={start}&count=100", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = fba.json_from_response(response)
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',data[]='{type(data)}'")
# Continue with next row
start = start + 100
- except BaseException as exception:
- print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"WARNING: Exception during fetching JSON: domain='{domain}',exc[{type(exc)}]:'{str(exc)}'")
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instances.set("total_peers", domain, len(peers))
+ instances.set_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
instances.update_last_instance_fetch(domain)
# DEBUG: print("DEBUG: Updating block reason:", domain, blocked, reason["reason"])
blocks.update_reason(reason["reason"], domain, blocked, block_level)
- # DEBUG: print(f"DEBUG: blockdict()={count(blockdict)")
+ # DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
for entry in blockdict:
if entry["blocked"] == blocked:
# DEBUG: print("DEBUG: Updating entry reason:", blocked)
entry["reason"] = reason["reason"]
fba.connection.commit()
- except BaseException as exception:
- print(f"ERROR: domain='{domain}',software='pleroma',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: domain='{domain}',software='pleroma',exc[{type(exc)}]:'{str(exc)}'")
# DEBUG: print("DEBUG: EXIT!")
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import json
-import requests
import sys
import time
+
+import requests
import validators
from fba import blacklist
"last_error_details" : {},
}
-def set(key: str, domain: str, value: any):
+def set_data(key: str, domain: str, value: any):
# DEBUG: print(f"DEBUG: key='{key}',domain='{domain}',value[]='{type(value)}' - CALLED!")
if not isinstance(key, str):
raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
except:
pass
- except BaseException as exception:
- print(f"ERROR: failed SQL query: domain='{domain}',sql_string='{sql_string}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: failed SQL query: domain='{domain}',sql_string='{sql_string}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print("DEBUG: EXIT!")
raise ValueError("Parameter 'domain' is empty")
# DEBUG: print("DEBUG: Updating last_instance_fetch for domain:", domain)
- set("last_instance_fetch", domain, time.time())
+ set_data("last_instance_fetch", domain, time.time())
# Running pending updated
# DEBUG: print(f"DEBUG: Invoking update_data({domain}) ...")
raise ValueError("Parameter 'domain' is empty")
# DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
- set("last_blocked", domain, time.time())
+ set_data("last_blocked", domain, time.time())
# Running pending updated
# DEBUG: print(f"DEBUG: Invoking update_data({domain}) ...")
if has_pending_instance_data(domain):
# DEBUG: print(f"DEBUG: domain='{domain}' has pending nodeinfo being updated ...")
- set("last_status_code" , domain, None)
- set("last_error_details", domain, None)
+ set_data("last_status_code" , domain, None)
+ set_data("last_error_details", domain, None)
update_data(domain)
fba.remove_pending_error(domain)
update_last_error(domain, fba.pending_errors[domain])
fba.remove_pending_error(domain)
- except BaseException as exception:
- print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: failed SQL query: domain='{domain}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
else:
# DEBUG: print("DEBUG: Updating nodeinfo for domain:", domain)
raise ValueError("Parameter 'domain' is empty")
# DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
- set("last_nodeinfo", domain, time.time())
- set("last_updated" , domain, time.time())
+ set_data("last_nodeinfo", domain, time.time())
+ set_data("last_updated" , domain, time.time())
# Running pending updated
# DEBUG: print(f"DEBUG: Invoking update_data({domain}) ...")
# DEBUG: print("DEBUG: AFTER response[]:", type(response))
if isinstance(response, str):
- # DEBUG: print(f"DEBUG: Setting last_error_details='{response}'");
- set("last_status_code" , domain, 999)
- set("last_error_details", domain, response)
+ # DEBUG: print(f"DEBUG: Setting last_error_details='{response}'")
+ set_data("last_status_code" , domain, 999)
+ set_data("last_error_details", domain, response)
else:
- # DEBUG: print(f"DEBUG: Setting last_error_details='{response.reason}'");
- set("last_status_code" , domain, response.status_code)
- set("last_error_details", domain, response.reason)
+ # DEBUG: print(f"DEBUG: Setting last_error_details='{response.reason}'")
+ set_data("last_status_code" , domain, response.status_code)
+ set_data("last_error_details", domain, response.reason)
# Running pending updated
# DEBUG: print(f"DEBUG: Invoking update_data({domain}) ...")
# DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
if not cache.key_exists("is_registered"):
- # DEBUG: print(f"DEBUG: Cache for 'is_registered' not initialized, fetching all rows ...")
+ # DEBUG: print("DEBUG: Cache for 'is_registered' not initialized, fetching all rows ...")
try:
fba.cursor.execute("SELECT domain FROM instances")
# Check Set all
cache.set_all("is_registered", fba.cursor.fetchall(), True)
- except BaseException as exception:
- print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: failed SQL query: domain='{domain}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# Is cache found?
import requests
from fba import config
+from fba import csrf
from fba import fba
from fba import instances
elif not isinstance(parameter, str):
raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
- # DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
+ # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}' ...")
+ headers = csrf.determine(domain, {**api_headers, **extra_headers})
+
data = {}
+
try:
+ # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',parameter='{parameter}',extra_headers({len(extra_headers)})={extra_headers}")
response = reqto.post(
f"https://{domain}{path}",
data=parameter,
- headers={**api_headers, **extra_headers},
+ headers=headers,
timeout=(config.get("connection_timeout"), config.get("read_timeout"))
)
print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
instances.update_last_error(domain, response)
- except BaseException as exception:
- print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exc[{type(exc)}]:'{str(exc)}'")
# DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
return data
fetch_response(domain, "/friendica", web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
"html.parser",
)
- except BaseException as exception:
- print("WARNING: Failed to fetch /friendica from domain:", domain, exception)
- instances.update_last_error(domain, exception)
+ except BaseException as exc:
+ print("WARNING: Failed to fetch /friendica from domain:", domain, exc)
+ instances.update_last_error(domain, exc)
return {}
blocklist = doc.find(id="about_blocklist")
elif path == "":
raise ValueError("Parameter 'path' is empty")
+ # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}',headers()='{len(headers)}' ...")
+ headers = csrf.determine(domain, headers)
+
try:
- # DEBUG: print(f"DEBUG: Sending request to '{domain}{path}' ...")
+ # DEBUG: print(f"DEBUG: Sending GET request to '{domain}{path}' ...")
response = reqto.get(
f"https://{domain}{path}",
headers=headers,
timeout=timeout
- );
- except requests.exceptions.ConnectionError as exception:
- # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
- instances.update_last_error(domain, exception)
- raise exception
+ )
+
+ except requests.exceptions.ConnectionError as exc:
+ # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exc[{type(exc)}]='{str(exc)}'")
+ instances.update_last_error(domain, exc)
+ raise exc
# DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
return response