From d4e620c6fb05c9c1741a89c412fe1e14cfb37940 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Sat, 3 Jun 2023 21:54:12 +0200 Subject: [PATCH] Continued: - renamed tidyup() to tidyup_domains() - so tidyup_reason() can come (WIP) --- fba/fba.py | 116 ++++++++++++++++++++++++------------------------ fetch_blocks.py | 24 +++++----- fetch_cs.py | 19 +++++++- 3 files changed, 87 insertions(+), 72 deletions(-) diff --git a/fba/fba.py b/fba/fba.py index 1e74092..228f98c 100644 --- a/fba/fba.py +++ b/fba/fba.py @@ -182,11 +182,11 @@ def fetch_instances(domain: str, origin: str, software: str, script: str, path: continue # DEBUG: print(f"DEBUG: instance[{type(instance}]={instance} - BEFORE") - instance = tidyup(instance) + instance = tidyup_domain(instance) # DEBUG: print(f"DEBUG: instance[{type(instance}]={instance} - AFTER") if instance == "": - print("WARNING: Empty instance after tidyup(), domain:", domain) + print("WARNING: Empty instance after tidyup_domain(), domain:", domain) continue elif not validators.domain(instance.split("/")[0]): print(f"WARNING: Bad instance='{instance}' from domain='{domain}',origin='{origin}',software='{software}'") @@ -235,7 +235,7 @@ def add_peers(rows: dict) -> list: # DEBUG: print(f"DEBUG: Adding {len(rows[element])} peer(s) to peers list ...") for peer in rows[element]: # DEBUG: print(f"DEBUG: peer='{peer}' - BEFORE!") - peer = tidyup(peer) + peer = tidyup_domain(peer) # DEBUG: print(f"DEBUG: peer='{peer}' - AFTER!") if is_blacklisted(peer): @@ -941,13 +941,13 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: # DEBUG: print(f"DEBUG: generator='{generator}',site_name='{site_name}'") if isinstance(generator, bs4.element.Tag): # DEBUG: print("DEBUG: Found generator meta tag:", domain) - software = tidyup(generator.get("content")) + software = tidyup_domain(generator.get("content")) print(f"INFO: domain='{domain}' is generated by '{software}'") set_instance_data("detection_mode", domain, "GENERATOR") remove_pending_error(domain) elif isinstance(site_name, bs4.element.Tag): # DEBUG: print("DEBUG: Found property=og:site_name:", domain) - sofware = tidyup(site_name.get("content")) + sofware = tidyup_domain(site_name.get("content")) print(f"INFO: domain='{domain}' has og:site_name='{software}'") set_instance_data("detection_mode", domain, "SITE_NAME") remove_pending_error(domain) @@ -1018,9 +1018,9 @@ def determine_software(domain: str, path: str = None) -> str: # DEBUG: print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!") return software - software = tidyup(data["software"]["name"]) + software = tidyup_domain(data["software"]["name"]) - # DEBUG: print("DEBUG: sofware after tidyup():", software) + # DEBUG: print("DEBUG: sofware after tidyup_domain():", software) if software in ["akkoma", "rebased"]: # DEBUG: print("DEBUG: Setting pleroma:", domain, software) software = "pleroma" @@ -1035,7 +1035,7 @@ def determine_software(domain: str, path: str = None) -> str: software = software.split("/")[-1]; elif software.find("|") > 0: print("WARNING: Spliting of pipe:", software) - software = tidyup(software.split("|")[0]); + software = tidyup_domain(software.split("|")[0]); elif "powered by" in software: # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it") software = strip_powered_by(software) @@ -1048,7 +1048,7 @@ def determine_software(domain: str, path: str = None) -> str: # DEBUG: print(f"DEBUG: software[]={type(software)}") if software == "": - print("WARNING: tidyup() left no software name behind:", domain) + print("WARNING: tidyup_domain() left no software name behind:", domain) software = None # DEBUG: print(f"DEBUG: software[]={type(software)}") @@ -1126,6 +1126,35 @@ def update_last_seen(blocker: str, blocked: str, block_level: str): # DEBUG: print("DEBUG: EXIT!") +def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool: + # DEBUG: print(f"DEBUG: blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!") + if type(blocker) != str: + raise ValueError(f"Parameter blocker[]={type(blocker)} is not of type 'str'") + elif blocker == "": + raise ValueError("Parameter 'blocker' cannot be empty") + elif type(blocked) != str: + raise ValueError(f"Parameter blocked[]={type(blocked)} is not of type 'str'") + elif blocked == "": + raise ValueError("Parameter 'blocked' cannot be empty") + elif type(block_level) != str: + raise ValueError(f"Parameter block_level[]={type(block_level)} is not of type 'str'") + elif block_level == "": + raise ValueError("Parameter 'block_level' cannot be empty") + + cursor.execute( + "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1", + ( + blocker, + blocked, + block_level + ), + ) + + is_blocked = cursor.fetchone() != None + + # DEBUG: print(f"DEBUG: is_blocked='{is_blocked}' - EXIT!") + return is_blocked + def block_instance(blocker: str, blocked: str, reason: str, block_level: str): # DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level) if type(blocker) != str: @@ -1313,7 +1342,7 @@ def get_mastodon_blocks(domain: str) -> dict: return {} for header in doc.find_all("h3"): - header_text = tidyup(header.text) + header_text = tidyup_domain(header.text) if header_text in language_mapping: # DEBUG: print(f"DEBUG: header_text='{header_text}'") @@ -1324,9 +1353,9 @@ def get_mastodon_blocks(domain: str) -> dict: for line in header.find_all_next("table")[0].find_all("tr")[1:]: blocks[header_text].append( { - "domain": tidyup(line.find("span").text), - "hash" : tidyup(line.find("span")["title"][9:]), - "reason": tidyup(line.find_all("td")[1].text), + "domain": tidyup_domain(line.find("span").text), + "hash" : tidyup_domain(line.find("span")["title"][9:]), + "reason": tidyup_domain(line.find_all("td")[1].text), } ) @@ -1367,8 +1396,8 @@ def get_friendica_blocks(domain: str) -> dict: for line in blocklist.find("table").find_all("tr")[1:]: # DEBUG: print(f"DEBUG: line='{line}'") blocks.append({ - "domain": tidyup(line.find_all("td")[0].text), - "reason": tidyup(line.find_all("td")[1].text) + "domain": tidyup_domain(line.find_all("td")[0].text), + "reason": tidyup_domain(line.find_all("td")[1].text) }) # DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks)) @@ -1431,7 +1460,7 @@ def get_misskey_blocks(domain: str) -> dict: if instance["isSuspended"]: blocks["suspended"].append( { - "domain": tidyup(instance["host"]), + "domain": tidyup_domain(instance["host"]), # no reason field, nothing "reason": None } @@ -1478,7 +1507,7 @@ def get_misskey_blocks(domain: str) -> dict: for instance in fetched: if instance["isBlocked"]: blocks["blocked"].append({ - "domain": tidyup(instance["host"]), + "domain": tidyup_domain(instance["host"]), "reason": None }) @@ -1497,57 +1526,28 @@ def get_misskey_blocks(domain: str) -> dict: "followers_only": blocks["suspended"] } -def tidyup(string: str) -> str: - # DEBUG: print(f"DEBUG: string='{string}' - CALLED!") - if type(string) != str: - raise ValueError(f"Parameter string[]={type(string)} is not expected") +def tidyup_domain(domain: str) -> str: + # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!") + if type(domain) != str: + raise ValueError(f"Parameter domain[]={type(domain)} is not expected") # All lower-case and strip spaces out - string = string.lower().strip() + domain = domain.lower().strip() # No port number - string = re.sub("\:\d+$", "", string) + domain = re.sub("\:\d+$", "", domain) # No protocol, sometimes with the slashes - string = re.sub("^https?\:(\/*)", "", string) + domain = re.sub("^https?\:(\/*)", "", domain) # No trailing slash - string = re.sub("\/$", "", string) + domain = re.sub("\/$", "", domain) # No @ sign - string = re.sub("^\@", "", string) + domain = re.sub("^\@", "", domain) # No individual users in block lists - string = re.sub("(.+)\@", "", string) - - # DEBUG: print(f"DEBUG: string='{string}' - EXIT!") - return string - -def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool: - # DEBUG: print(f"DEBUG: blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!") - if type(blocker) != str: - raise ValueError(f"Parameter blocker[]={type(blocker)} is not of type 'str'") - elif blocker == "": - raise ValueError("Parameter 'blocker' cannot be empty") - elif type(blocked) != str: - raise ValueError(f"Parameter blocked[]={type(blocked)} is not of type 'str'") - elif blocked == "": - raise ValueError("Parameter 'blocked' cannot be empty") - elif type(block_level) != str: - raise ValueError(f"Parameter block_level[]={type(block_level)} is not of type 'str'") - elif block_level == "": - raise ValueError("Parameter 'block_level' cannot be empty") + domain = re.sub("(.+)\@", "", domain) - cursor.execute( - "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1", - ( - blocker, - blocked, - block_level - ), - ) - - is_blocked = cursor.fetchone() != None - - # DEBUG: print(f"DEBUG: is_blocked='{is_blocked}' - EXIT!") - return is_blocked + # DEBUG: print(f"DEBUG: domain='{domain}' - EXIT!") + return domain diff --git a/fetch_blocks.py b/fetch_blocks.py index 757685b..a9b8535 100755 --- a/fetch_blocks.py +++ b/fetch_blocks.py @@ -37,7 +37,7 @@ print(f"INFO: Checking {len(rows)} entries ...") for blocker, software, origin, nodeinfo_url in rows: # DEBUG: print("DEBUG: BEFORE blocker,software,origin,nodeinfo_url:", blocker, software, origin, nodeinfo_url) blockdict = [] - blocker = fba.tidyup(blocker) + blocker = fba.tidyup_domain(blocker) # DEBUG: print("DEBUG: AFTER blocker,software:", blocker, software) if blocker == "": @@ -80,7 +80,7 @@ for blocker, software, origin, nodeinfo_url in rows: **{"quarantined_instances": federation["quarantined_instances"]}} ).items(): # DEBUG: print("DEBUG: block_level, blocks():", block_level, len(blocks)) - block_level = fba.tidyup(block_level) + block_level = fba.tidyup_domain(block_level) # DEBUG: print("DEBUG: BEFORE block_level:", block_level) if block_level == "": @@ -90,11 +90,11 @@ for blocker, software, origin, nodeinfo_url in rows: # DEBUG: print(f"DEBUG: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...") for blocked in blocks: # DEBUG: print("DEBUG: BEFORE blocked:", blocked) - blocked = fba.tidyup(blocked) + blocked = fba.tidyup_domain(blocked) # DEBUG: print("DEBUG: AFTER blocked:", blocked) if blocked == "": - print("WARNING: blocked is empty after fba.tidyup():", blocker, block_level) + print("WARNING: blocked is empty after fba.tidyup_domain():", blocker, block_level) continue elif fba.is_blacklisted(blocked): # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!") @@ -153,7 +153,7 @@ for blocker, software, origin, nodeinfo_url in rows: else {})} ).items(): # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items())) - block_level = fba.tidyup(block_level) + block_level = fba.tidyup_domain(block_level) # DEBUG: print("DEBUG: BEFORE block_level:", block_level) if block_level == "": @@ -163,11 +163,11 @@ for blocker, software, origin, nodeinfo_url in rows: # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...") for blocked, reason in info.items(): # DEBUG: print("DEBUG: BEFORE blocked:", blocked) - blocked = fba.tidyup(blocked) + blocked = fba.tidyup_domain(blocked) # DEBUG: print("DEBUG: AFTER blocked:", blocked) if blocked == "": - print("WARNING: blocked is empty after fba.tidyup():", blocker, block_level) + print("WARNING: blocked is empty after fba.tidyup_domain():", blocker, block_level) continue elif fba.is_blacklisted(blocked): # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!") @@ -268,7 +268,7 @@ for blocker, software, origin, nodeinfo_url in rows: print(f"INFO: Checking {len(json.items())} entries from blocker='{blocker}',software='{software}' ...") for block_level, blocks in json.items(): # DEBUG: print("DEBUG: blocker,block_level,blocks():", blocker, block_level, len(blocks)) - block_level = fba.tidyup(block_level) + block_level = fba.tidyup_domain(block_level) # DEBUG: print("DEBUG: AFTER-block_level:", block_level) if block_level == "": print("WARNING: block_level is empty, blocker:", blocker) @@ -278,7 +278,7 @@ for blocker, software, origin, nodeinfo_url in rows: for block in blocks: blocked, blocked_hash, reason = block.values() # DEBUG: print("DEBUG: blocked,hash,reason:", blocked, blocked_hash, reason) - blocked = fba.tidyup(blocked) + blocked = fba.tidyup_domain(blocked) # DEBUG: print("DEBUG: AFTER-blocked:", blocked) if blocked == "": @@ -362,7 +362,7 @@ for blocker, software, origin, nodeinfo_url in rows: print(f"INFO: Checking {len(json.items())} entries from blocker='{blocker}',software='{software}' ...") for block_level, blocks in json.items(): # DEBUG: print("DEBUG: blocker,block_level,blocks():", blocker, block_level, len(blocks)) - block_level = fba.tidyup(block_level) + block_level = fba.tidyup_domain(block_level) # DEBUG: print("DEBUG: AFTER-block_level:", block_level) if block_level == "": print("WARNING: block_level is empty, blocker:", blocker) @@ -372,7 +372,7 @@ for blocker, software, origin, nodeinfo_url in rows: for block in blocks: blocked, reason = block.values() # DEBUG: print("DEBUG: BEFORE blocked:", blocked) - blocked = fba.tidyup(blocked) + blocked = fba.tidyup_domain(blocked) # DEBUG: print("DEBUG: AFTER blocked:", blocked) if blocked == "": @@ -455,7 +455,7 @@ for blocker, software, origin, nodeinfo_url in rows: for peer in federation: blocked = peer["domain"].lower() # DEBUG: print("DEBUG: BEFORE blocked:", blocked) - blocked = fba.tidyup(blocked) + blocked = fba.tidyup_domain(blocked) # DEBUG: print("DEBUG: AFTER blocked:", blocked) if blocked == "": diff --git a/fetch_cs.py b/fetch_cs.py index 149acc0..94e4d68 100755 --- a/fetch_cs.py +++ b/fetch_cs.py @@ -40,14 +40,29 @@ def find_domains(tag: bs4.element.Tag) -> list: # DEBUG: print("DEBUG: Skipping element, no found") continue - domain = fba.tidyup(element.find("td").text) - reason = element.findAll("td")[1].text + domain = fba.tidyup_domain(element.find("td").text) + reason = fba.tidyup_reason(element.findAll("td")[1].text) # DEBUG: print(f"DEBUG: domain='{domain}',reason='{reason}'") if fba.is_blacklisted(domain): print(f"WARNING: domain='{domain}' is blacklisted - skipped!") continue + elif domain == "gab.com/.ai, develop.gab.com": + print(f"DEBUG: Multiple domains detected in one row") + domains.append({ + "domain": "gab.com", + "reason": reason, + }) + domains.append({ + "domain": "gab.ai", + "reason": reason, + }) + domains.append({ + "domain": "develop.gab.com", + "reason": reason, + }) + continue elif not validators.domain(domain): print(f"WARNING: domain='{domain}' is not a valid domain - skipped!") continue -- 2.39.5