import inspect
import validators
+import bs4
+
from fba import blacklist
from fba import blocks
+from fba import config
from fba import fba
from fba import federation
from fba import instances
+from fba import network
+
from fba.helpers import tidyup
def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
# Blocks
blockdict = list()
- rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
+ rows = None
+ try:
+ rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
+ except network.exceptions as exception:
+ print(f"WARNING: Exception '{type(exception)}' during fetching nodeinfo")
if rows is None:
print("WARNING: Could not fetch nodeinfo from domain:", domain)
data = rows["metadata"]["federation"]
- if "enabled" in data:
- # DEBUG: print("DEBUG: Instance has no block list to analyze:", domain)
- return
-
if "mrf_simple" in data:
+ # DEBUG: print("DEBUG: Found mrf_simple:", domain)
for block_level, blocklist in (
- {**data["mrf_simple"],
- **{"quarantined_instances": data["quarantined_instances"]}}
+ {
+ **data["mrf_simple"],
+ **{
+ "quarantined_instances": data["quarantined_instances"]
+ }
+ }
).items():
# DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
block_level = tidyup.domain(block_level)
)
searchres = fba.cursor.fetchone()
- print(f"DEBUG: searchres[]='{type(searchres)}'")
+ # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
if searchres is None:
print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
continue
- blocked = searchres[0]
+ blocked = searchres[0]
nodeinfo_url = searchres[1]
# DEBUG: print("DEBUG: Looked up domain:", blocked)
elif not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
continue
-
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
- if not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
- continue
elif blocked.split(".")[-1] == "arpa":
print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
continue
- elif not instances.is_registered(blocked):
+
+ # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ if not instances.is_registered(blocked):
+ # Commit changes
+ fba.connection.commit()
+
# DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
else:
# DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
blocks.update_last_seen(domain, blocked, block_level)
- else:
- # DEBUG: print(f"DEBUG: domain='{domain}' has returned zero rows, trying /about/more page ...")
- rows = fetch_blocks_from_about(domain)
# DEBUG: print("DEBUG: Committing changes ...")
fba.connection.commit()
if "mrf_simple_info" in data:
# DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
for block_level, info in (
- {**data["mrf_simple_info"],
- **(data["quarantined_instances_info"]
- if "quarantined_instances_info" in data
- else {})}
+ {
+ **data["mrf_simple_info"],
+ **(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
+ }
).items():
# DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
block_level = tidyup.domain(block_level)
# DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
for blocked, reason in info.items():
- # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - BEFORE!")
+ # DEBUG: print(f"DEBUG: blocked='{blocked}',reason[{type(reason)}]='{reason}' - BEFORE!")
blocked = tidyup.domain(blocked)
- reason = tidyup.reason(reason) if reason is not None and reason != "" else None
+
+ if isinstance(reason, str):
+ # DEBUG: print("DEBUG: reason[] is a string")
+ reason = tidyup.reason(reason)
+ elif isinstance(reason, dict) and "reason" in reason:
+ # DEBUG: print("DEBUG: reason[] is a dict")
+ reason = tidyup.reason(reason["reason"])
+ elif reason is not None:
+ raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
+
# DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
if blocked == "":
print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
continue
elif not instances.is_registered(blocked):
- # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodein
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
- # DEBUG: print("DEBUG: Updating block reason:", domain, blocked, reason["reason"])
- blocks.update_reason(reason["reason"], domain, blocked, block_level)
+ # DEBUG: print(f"DEBUG: Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
+ blocks.update_reason(reason, domain, blocked, block_level)
# DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
for entry in blockdict:
if entry["blocked"] == blocked:
- # DEBUG: print("DEBUG: Updating entry reason:", blocked)
- entry["reason"] = reason["reason"]
+ # DEBUG: print(f"DEBUG: Updating entry reason: blocked='{blocked}',reason='{reason}'")
+ entry["reason"] = reason
fba.connection.commit()
-
# DEBUG: print("DEBUG: EXIT!")
-
-def fetch_blocks_from_about(domain: str) -> dict:
- print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
-
- print("DEBUG: Fetching mastodon blocks from domain:", domain)
- blocklist = {
- "Suspended servers": [],
- "Filtered media" : [],
- "Limited servers" : [],
- "Silenced servers" : [],
- }
-
- doc = None
- for path in ("/about/more", "/about"):
- try:
- print(f"DEBUG: Fetching path='{path}' from domain='{domain}' ...")
- doc = bs4.BeautifulSoup(
- network.fetch_response(
- domain,
- path,
- network.web_headers,
- (config.get("connection_timeout"), config.get("read_timeout"))
- ).text,
- "html.parser",
- )
-
- if len(doc.find_all("h3")) > 0:
- print(f"DEBUG: path='{path}' had some headlines - BREAK!")
- break
-
- except BaseException as exception:
- print("ERROR: Cannot fetch from domain:", domain, exception)
- instances.update_last_error(domain, exception)
- break
-
- print(f"DEBUG: doc[]='{type(doc)}'")
- if doc is None:
- print(f"WARNING: Cannot find any 'h3' tags for domain='{domain}' - EXIT!")
- return blocklist
-
- for header in doc.find_all("h3"):
- header_text = tidyup.reason(header.text)
-
- print(f"DEBUG: header_text='{header_text}'")
- if header_text in language_mapping:
- print(f"DEBUG: header_text='{header_text}'")
- header_text = language_mapping[header_text]
- else:
- print(f"WARNING: header_text='{header_text}' not found in language mapping table")
-
- if header_text in blocklist or header_text.lower() in blocklist:
- # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
- for line in header.find_all_next("table")[0].find_all("tr")[1:]:
- blocklist[header_text].append(
- {
- "domain": tidyup.domain(line.find("span").text),
- "hash" : tidyup.domain(line.find("span")["title"][9:]),
- "reason": tidyup.reason(line.find_all("td")[1].text),
- }
- )
- else:
- print(f"WARNING: header_text='{header_text}' not found in blocklist()={len(blocklist)}")
-
- print("DEBUG: Returning blocklist for domain:", domain)
- return {
- "reject" : blocklist["Suspended servers"],
- "media_removal" : blocklist["Filtered media"],
- "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
- }