# DEBUG: print(f"DEBUG: domain='{domain}' has pending nodeinfo data, flushing ...")
instances.update_data(domain)
- print(f"INFO: Checking {len(peerlist)} instances from {domain} ...")
+ print(f"INFO: Checking {len(peerlist)} instances from domain='{domain}' ...")
for instance in peerlist:
# DEBUG: print(f"DEBUG: instance='{instance}'")
if instance is None:
# DEBUG: print(f"DEBUG: Fetching nodeinfo from domain='{domain}' ...")
nodeinfo = fetch_wellknown_nodeinfo(domain)
- # DEBUG: print(f"DEBUG: nodeinfo[{type(nodeinfo)}]='{nodeinfo}'")
+
+ # DEBUG: print(f"DEBUG: nodeinfo[{type(nodeinfo)}]()='{len(nodeinfo)}'")
+ if "error_message" not in nodeinfo and "json" in nodeinfo and len(nodeinfo["json"]) > 0:
+ # DEBUG: print(f"DEBUG: Found nodeinfo[json]()={len(nodeinfo['json'])} - EXIT!")
+ return nodeinfo["json"]
# No CSRF by default, you don't have to add network.api_headers by yourself here
headers = tuple()
]
for request in request_paths:
- # DEBUG: print(f"DEBUG: path[{type(path)}]='{path}',request='{request'}")
+ # DEBUG: print(f"DEBUG: path[{type(path)}]='{path}',request='{request}'")
if path is not None and path != "" and path != request:
# DEBUG: print(f"DEBUG: path='{path}' does not match request='{request}' - SKIPPED!")
continue
# Continue raising it
raise data["exception"]
elif "error_message" in data:
- print(f"DEBUG: Returned error_message during fetching nodeinfo: '{data['error_message']}',status_code='{data['status_code']}'")
+ # DEBUG: print(f"DEBUG: Returned error_message during fetching nodeinfo: '{data['error_message']}',status_code='{data['status_code']}'")
return fetch_generator_from_path(domain)
- elif "status" in data["json"] and data["json"]["status"] == "error" and "message" in data["json"]:
- print("WARNING: JSON response is an error:", data["json"]["message"])
- instances.update_last_error(domain, data["json"]["message"])
+ elif "status" in data and data["status"] == "error" and "message" in data:
+ print("WARNING: JSON response is an error:", data["message"])
+ instances.update_last_error(domain, data["message"])
return fetch_generator_from_path(domain)
- elif "message" in data["json"]:
+ elif "message" in data:
print("WARNING: JSON response contains only a message:", data["message"])
- instances.update_last_error(domain, data["json"]["message"])
+ instances.update_last_error(domain, data["message"])
return fetch_generator_from_path(domain)
- elif "software" not in data["json"] or "name" not in data["json"]["software"]:
+ elif "software" not in data or "name" not in data["software"]:
# DEBUG: print(f"DEBUG: JSON response from domain='{domain}' does not include [software][name], fetching / ...")
software = fetch_generator_from_path(domain)
# DEBUG: print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!")
return software
- software = tidyup.domain(data["json"]["software"]["name"])
+ software = tidyup.domain(data["software"]["name"])
# DEBUG: print("DEBUG: sofware after tidyup.domain():", software)
if software in ["akkoma", "rebased"]:
import inspect
import validators
+import bs4
+
from fba import blacklist
from fba import blocks
+from fba import config
from fba import fba
from fba import federation
from fba import instances
+from fba import network
+
from fba.helpers import tidyup
def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
data = rows["metadata"]["federation"]
- if "enabled" in data:
- # DEBUG: print("DEBUG: Instance has no block list to analyze:", domain)
- return
-
if "mrf_simple" in data:
+ # DEBUG: print("DEBUG: Found mrf_simple:", domain)
for block_level, blocklist in (
- {**data["mrf_simple"],
- **{"quarantined_instances": data["quarantined_instances"]}}
+ {
+ **data["mrf_simple"],
+ **{
+ "quarantined_instances": data["quarantined_instances"]
+ }
+ }
).items():
# DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
block_level = tidyup.domain(block_level)
)
searchres = fba.cursor.fetchone()
- print(f"DEBUG: searchres[]='{type(searchres)}'")
+ # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
if searchres is None:
print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
continue
- blocked = searchres[0]
+ blocked = searchres[0]
nodeinfo_url = searchres[1]
# DEBUG: print("DEBUG: Looked up domain:", blocked)
elif not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
continue
-
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
- if not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
- continue
elif blocked.split(".")[-1] == "arpa":
print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
continue
- elif not instances.is_registered(blocked):
+
+ # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ if not instances.is_registered(blocked):
# DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
else:
# DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
blocks.update_last_seen(domain, blocked, block_level)
- else:
- # DEBUG: print(f"DEBUG: domain='{domain}' has returned zero rows, trying /about/more page ...")
- rows = fetch_blocks_from_about(domain)
# DEBUG: print("DEBUG: Committing changes ...")
fba.connection.commit()
if "mrf_simple_info" in data:
# DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
for block_level, info in (
- {**data["mrf_simple_info"],
- **(data["quarantined_instances_info"]
- if "quarantined_instances_info" in data
- else {})}
+ {
+ **data["mrf_simple_info"],
+ **(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
+ }
).items():
# DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
block_level = tidyup.domain(block_level)
# DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
for blocked, reason in info.items():
- # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - BEFORE!")
+ # DEBUG: print(f"DEBUG: blocked='{blocked}',reason[{type(reason)}]='{reason}' - BEFORE!")
blocked = tidyup.domain(blocked)
- reason = tidyup.reason(reason) if reason is not None and reason != "" else None
+
+ if isinstance(reason, str):
+ # DEBUG: print("DEBUG: reason[] is a string")
+ reason = tidyup.reason(reason)
+ elif isinstance(reason, dict) and "reason" in reason:
+ # DEBUG: print("DEBUG: reason[] is a dict")
+ reason = tidyup.reason(reason["reason"])
+ elif reason is not None:
+ raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
+
# DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
if blocked == "":
# DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
- # DEBUG: print("DEBUG: Updating block reason:", domain, blocked, reason["reason"])
- blocks.update_reason(reason["reason"], domain, blocked, block_level)
+ # DEBUG: print(f"DEBUG: Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
+ blocks.update_reason(reason, domain, blocked, block_level)
# DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
for entry in blockdict:
if entry["blocked"] == blocked:
- # DEBUG: print("DEBUG: Updating entry reason:", blocked)
- entry["reason"] = reason["reason"]
+ # DEBUG: print(f"DEBUG: Updating entry reason: blocked='{blocked}',reason='{reason}'")
+ entry["reason"] = reason
fba.connection.commit()
-
# DEBUG: print("DEBUG: EXIT!")
-
-def fetch_blocks_from_about(domain: str) -> dict:
- print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
-
- print("DEBUG: Fetching mastodon blocks from domain:", domain)
- blocklist = {
- "Suspended servers": [],
- "Filtered media" : [],
- "Limited servers" : [],
- "Silenced servers" : [],
- }
-
- doc = None
- for path in ("/about/more", "/about"):
- try:
- print(f"DEBUG: Fetching path='{path}' from domain='{domain}' ...")
- doc = bs4.BeautifulSoup(
- network.fetch_response(
- domain,
- path,
- network.web_headers,
- (config.get("connection_timeout"), config.get("read_timeout"))
- ).text,
- "html.parser",
- )
-
- if len(doc.find_all("h3")) > 0:
- print(f"DEBUG: path='{path}' had some headlines - BREAK!")
- break
-
- except BaseException as exception:
- print("ERROR: Cannot fetch from domain:", domain, exception)
- instances.update_last_error(domain, exception)
- break
-
- print(f"DEBUG: doc[]='{type(doc)}'")
- if doc is None:
- print(f"WARNING: Cannot find any 'h3' tags for domain='{domain}' - EXIT!")
- return blocklist
-
- for header in doc.find_all("h3"):
- header_text = tidyup.reason(header.text)
-
- print(f"DEBUG: header_text='{header_text}'")
- if header_text in language_mapping:
- print(f"DEBUG: header_text='{header_text}'")
- header_text = language_mapping[header_text]
- else:
- print(f"WARNING: header_text='{header_text}' not found in language mapping table")
-
- if header_text in blocklist or header_text.lower() in blocklist:
- # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
- for line in header.find_all_next("table")[0].find_all("tr")[1:]:
- blocklist[header_text].append(
- {
- "domain": tidyup.domain(line.find("span").text),
- "hash" : tidyup.domain(line.find("span")["title"][9:]),
- "reason": tidyup.reason(line.find_all("td")[1].text),
- }
- )
- else:
- print(f"WARNING: header_text='{header_text}' not found in blocklist()={len(blocklist)}")
-
- print("DEBUG: Returning blocklist for domain:", domain)
- return {
- "reject" : blocklist["Suspended servers"],
- "media_removal" : blocklist["Filtered media"],
- "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
- }