# DEBUG: print(f"DEBUG: Fetching nodeinfo from domain='{domain}' ...")
nodeinfo = fetch_wellknown_nodeinfo(domain)
-
# DEBUG: print(f"DEBUG: nodeinfo[{type(nodeinfo)}]='{nodeinfo}'")
- if "error_message" in nodeinfo:
- print(f"WARNING: Error during fetching nodeinfo: '{nodeinfo['error_message']}' - EXIT!")
- return nodeinfo
# No CSRF by default, you don't have to add network.api_headers by yourself here
headers = tuple()
data = fetch_nodeinfo(domain, path)
# DEBUG: print(f"DEBUG: data[{type(data)}]='{data}'")
- if "error_message" in data:
- # DEBUG: print(f"DEBUG: Could not determine software type, domain='{domain}'")
- if "exception" in data:
- # Continue raising it
- raise data["exception"]
- else:
- # Raise generic exception if none is attached
- raise Exception(f"Cannot fetch nodeinfo from domain='{domain}': '{data['error_message']}'")
-
- # DEBUG: print("DEBUG: data():", len(data), data)
- if "status" in data["json"] and data["json"]["status"] == "error" and "message" in data["json"]:
+ if "exception" in data:
+ # Continue raising it
+ raise data["exception"]
+ elif "error_message" in data:
+ print(f"DEBUG: Returned error_message during fetching nodeinfo: '{data['error_message']}',status_code='{data['status_code']}'")
+ return fetch_generator_from_path(domain)
+ elif "status" in data["json"] and data["json"]["status"] == "error" and "message" in data["json"]:
print("WARNING: JSON response is an error:", data["json"]["message"])
instances.update_last_error(domain, data["json"]["message"])
return fetch_generator_from_path(domain)
print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',data()={len(data)},response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
json_reply["status_code"] = response.status_code
json_reply["error_message"] = response.reason
+ del json_reply["json"]
instances.update_last_error(domain, response)
- except requests.exceptions.ConnectionError as exception:
+ except exceptions as exception:
# DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
json_reply["status_code"] = 999
json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
print(f"WARNING: Cannot query JSON API: url='{url}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
json_reply["status_code"] = response.status_code
json_reply["error_message"] = response.reason
+ del json_reply["json"]
- except requests.exceptions.ConnectionError as exception:
+ except exceptions as exception:
# DEBUG: print(f"DEBUG: Fetching '{url}' failed. exception[{type(exception)}]='{str(exception)}'")
json_reply["status_code"] = 999
json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
timeout=timeout
)
- except requests.exceptions.ConnectionError as exception:
+ except exceptions as exception:
# DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
json_reply["status_code"] = 999
json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
json_reply["status_code"] = response.status_code
json_reply["error_message"] = response.reason
+ del json_reply["json"]
instances.update_last_error(domain, response)
# DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
timeout=timeout
)
- except requests.exceptions.ConnectionError as exception:
+ except exceptions as exception:
# DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
instances.update_last_error(domain, exception)
raise exception
continue
# DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
- for blocked in blocklist:
- # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
- blocked = tidyup.domain(blocked)
- # DEBUG: print("DEBUG: AFTER blocked:", blocked)
-
- if blocked == "":
- print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
- continue
- elif blacklist.is_blacklisted(blocked):
- # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
- continue
- elif blocked.count("*") > 1:
- # -ACK!-oma also started obscuring domains without hash
- fba.cursor.execute(
- "SELECT domain, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
- )
- searchres = fba.cursor.fetchone()
+ if len(blocklist) > 0:
+ for blocked in blocklist:
+ # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
+ blocked = tidyup.domain(blocked)
+ # DEBUG: print("DEBUG: AFTER blocked:", blocked)
- print(f"DEBUG: searchres[]='{type(searchres)}'")
- if searchres is None:
- print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
+ if blocked == "":
+ print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
+ continue
+ elif blacklist.is_blacklisted(blocked):
+ # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
continue
+ elif blocked.count("*") > 1:
+ # -ACK!-oma also started obscuring domains without hash
+ fba.cursor.execute(
+ "SELECT domain, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
+ )
+ searchres = fba.cursor.fetchone()
- blocked = searchres[0]
- nodeinfo_url = searchres[1]
- # DEBUG: print("DEBUG: Looked up domain:", blocked)
- elif not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
- continue
+ print(f"DEBUG: searchres[]='{type(searchres)}'")
+ if searchres is None:
+ print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
+ continue
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
- if not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
- continue
- elif blocked.split(".")[-1] == "arpa":
- print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif not instances.is_registered(blocked):
- # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
- instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
+ blocked = searchres[0]
+ nodeinfo_url = searchres[1]
+ # DEBUG: print("DEBUG: Looked up domain:", blocked)
+ elif not validators.domain(blocked):
+ print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
+ continue
- if not blocks.is_instance_blocked(domain, blocked, block_level):
- # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
- blocks.add_instance(domain, blocked, "unknown", block_level)
+ # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ if not validators.domain(blocked):
+ print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
+ continue
+ elif blocked.split(".")[-1] == "arpa":
+ print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
+ continue
+ elif not instances.is_registered(blocked):
+ # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
+
+ if not blocks.is_instance_blocked(domain, blocked, block_level):
+ # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
+ blocks.add_instance(domain, blocked, "unknown", block_level)
- if block_level == "reject":
- # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
- blockdict.append({
- "blocked": blocked,
- "reason" : None
- })
- else:
- # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
- blocks.update_last_seen(domain, blocked, block_level)
+ if block_level == "reject":
+ # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
+ blockdict.append({
+ "blocked": blocked,
+ "reason" : None
+ })
+ else:
+ # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
+ blocks.update_last_seen(domain, blocked, block_level)
+ else:
+ # DEBUG: print(f"DEBUG: domain='{domain}' has returned zero rows, trying /about/more page ...")
+ rows = fetch_blocks_from_about(domain)
# DEBUG: print("DEBUG: Committing changes ...")
fba.connection.commit()
fba.connection.commit()
# DEBUG: print("DEBUG: EXIT!")
+
+def fetch_blocks_from_about(domain: str) -> dict:
+ print(f"DEBUG: domain='{domain}' - CALLED!")
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+
+ print("DEBUG: Fetching mastodon blocks from domain:", domain)
+ blocklist = {
+ "Suspended servers": [],
+ "Filtered media" : [],
+ "Limited servers" : [],
+ "Silenced servers" : [],
+ }
+
+ doc = None
+ for path in ("/about/more", "/about"):
+ try:
+ print(f"DEBUG: Fetching path='{path}' from domain='{domain}' ...")
+ doc = bs4.BeautifulSoup(
+ network.fetch_response(
+ domain,
+ path,
+ network.web_headers,
+ (config.get("connection_timeout"), config.get("read_timeout"))
+ ).text,
+ "html.parser",
+ )
+
+ if len(doc.find_all("h3")) > 0:
+ print(f"DEBUG: path='{path}' had some headlines - BREAK!")
+ break
+
+ except BaseException as exception:
+ print("ERROR: Cannot fetch from domain:", domain, exception)
+ instances.update_last_error(domain, exception)
+ break
+
+ print(f"DEBUG: doc[]='{type(doc)}'")
+ if doc is None:
+ print(f"WARNING: Cannot find any 'h3' tags for domain='{domain}' - EXIT!")
+ return blocklist
+
+ for header in doc.find_all("h3"):
+ header_text = tidyup.reason(header.text)
+
+ print(f"DEBUG: header_text='{header_text}'")
+ if header_text in language_mapping:
+ print(f"DEBUG: header_text='{header_text}'")
+ header_text = language_mapping[header_text]
+ else:
+ print(f"WARNING: header_text='{header_text}' not found in language mapping table")
+
+ if header_text in blocklist or header_text.lower() in blocklist:
+ # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
+ for line in header.find_all_next("table")[0].find_all("tr")[1:]:
+ blocklist[header_text].append(
+ {
+ "domain": tidyup.domain(line.find("span").text),
+ "hash" : tidyup.domain(line.find("span")["title"][9:]),
+ "reason": tidyup.reason(line.find_all("td")[1].text),
+ }
+ )
+ else:
+ print(f"WARNING: header_text='{header_text}' not found in blocklist()={len(blocklist)}")
+
+ print("DEBUG: Returning blocklist for domain:", domain)
+ return {
+ "reject" : blocklist["Suspended servers"],
+ "media_removal" : blocklist["Filtered media"],
+ "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
+ }