From 6668e177bf4c9a7e69b5207767fc441f79fa3cb2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Fri, 2 Jun 2023 13:31:57 +0200 Subject: [PATCH] Continued: - encapsulated setting instance data in set_instance_data() - also introduced is_primitive() --- fba.py | 190 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 111 insertions(+), 79 deletions(-) diff --git a/fba.py b/fba.py index c8e56cb..b45a019 100644 --- a/fba.py +++ b/fba.py @@ -150,7 +150,9 @@ def is_cache_initialized(key: str) -> bool: def set_all_cache_key(key: str, rows: list, value: any): # NOISY-DEBUG: print(f"DEBUG: key='{key}',rows()={len(rows)},value[]={type(value)} - CALLED!") - if not is_cache_initialized(key): + if type(key) != str: + raise ValueError("Parameter key[]='{type(key)}' is not 'str'") + elif not is_cache_initialized(key): # NOISY-DEBUG: print(f"DEBUG: Cache for key='{key}' not initialized.") cache[key] = {} @@ -165,19 +167,51 @@ def set_all_cache_key(key: str, rows: list, value: any): # NOISY-DEBUG: print("DEBUG: EXIT!") def set_cache_key(key: str, sub: str, value: any): - if not is_cache_initialized(key): + if type(key) != str: + raise ValueError("Parameter key[]='{type(key)}' is not 'str'") + elif type(sub) != str: + raise ValueError("Parameter sub[]='{type(sub)}' is not 'str'") + elif not is_cache_initialized(key): print(f"WARNING: Bad method call, key='{key}' is not initialized yet.") raise Exception(f"Cache for key='{key}' is not initialized, but function called") cache[key][sub] = value def is_cache_key_set(key: str, sub: str) -> bool: - if not is_cache_initialized(key): + if type(key) != str: + raise ValueError("Parameter key[]='{type(key)}' is not 'str'") + elif type(sub) != str: + raise ValueError("Parameter sub[]='{type(sub)}' is not 'str'") + elif not is_cache_initialized(key): print(f"WARNING: Bad method call, key='{key}' is not initialized yet.") raise Exception(f"Cache for key='{key}' is not initialized, but function called") return sub in cache[key] +##### Other functions ##### + +def is_primitive(var: any) -> bool: + #print(f"DEBUG: var[]='{type(var)}' - CALLED!") + return type(var) in {int, str, float, bool} + +def set_instance_data(key: str, domain: str, value: any): + #print(f"DEBUG: key='{key}',domain='{domain}',value[]='{type(value)}' - CALLED!") + if type(key) != str: + raise ValueError("Parameter key[]='{type(key)}' is not 'str'") + elif key == "": + raise ValueError(f"Parameter 'key' cannot be empty") + elif type(domain) != str: + raise ValueError("Parameter domain[]='{type(domain)}' is not 'str'") + elif domain == "": + raise ValueError(f"Parameter 'domain' cannot be empty") + elif not key in instance_data: + raise ValueError(f"key='{key}' not found in instance_data") + elif not is_primitive(value): + raise ValueError(f"value[]='{type(value)}' is not a primitive type") + + # Set it + instance_data[key][domain] = value + def add_peers(rows: dict) -> list: # DEBUG: print(f"DEBUG: rows()={len(rows)} - CALLED!") peers = list() @@ -318,9 +352,9 @@ def strip_until(software: str, until: str) -> str: def is_blacklisted(domain: str) -> bool: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") blacklisted = False for peer in blacklist: @@ -331,9 +365,9 @@ def is_blacklisted(domain: str) -> bool: def remove_pending_error(domain: str): if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") try: # Prevent updating any pending errors, nodeinfo was found @@ -344,20 +378,20 @@ def remove_pending_error(domain: str): def get_hash(domain: str) -> str: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") return hashlib.sha256(domain.encode("utf-8")).hexdigest() def update_last_blocked(domain: str): if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: Updating last_blocked for domain", domain) - instance_data["last_blocked"][domain] = time.time() + set_instance_data("last_blocked", domain, time.time()) # Running pending updated # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...") @@ -368,9 +402,9 @@ def update_last_blocked(domain: str): def has_pending_instance_data(domain: str) -> bool: # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!") if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") has_pending = False for key in instance_data: @@ -384,9 +418,9 @@ def has_pending_instance_data(domain: str) -> bool: def update_instance_data(domain: str): if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print(f"DEBUG: Updating nodeinfo for domain='{domain}' ...") sql_string = '' @@ -401,7 +435,7 @@ def update_instance_data(domain: str): fields.append(domain) if sql_string == '': - raise ValueError(f"WARNING: No fields have been set, but method invoked, domain='{domain}'") + raise ValueError(f"No fields have been set, but method invoked, domain='{domain}'") # DEBUG: print(f"DEBUG: sql_string='{sql_string}',fields()={len(fields)}") sql = "UPDATE instances SET" + sql_string + " last_status_code = NULL, last_error_details = NULL, last_updated = TIME() WHERE domain = ? LIMIT 1" @@ -435,9 +469,9 @@ def update_instance_data(domain: str): def log_error(domain: str, res: any): # DEBUG: print("DEBUG: domain,res[]:", domain, type(res)) if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") try: # DEBUG: print("DEBUG: BEFORE res[]:", type(res)) @@ -471,9 +505,9 @@ def log_error(domain: str, res: any): def update_last_error(domain: str, res: any): # DEBUG: print("DEBUG: domain,res[]:", domain, type(res)) if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: BEFORE res[]:", type(res)) if isinstance(res, BaseException) or isinstance(res, json.JSONDecodeError): @@ -482,12 +516,12 @@ def update_last_error(domain: str, res: any): # DEBUG: print("DEBUG: AFTER res[]:", type(res)) if type(res) is str: # DEBUG: print(f"DEBUG: Setting last_error_details='{res}'"); - instance_data["last_status_code"][domain] = 999 - instance_data["last_error_details"][domain] = res + set_instance_data("last_status_code" , domain, 999) + set_instance_data("last_error_details", domain, res) else: # DEBUG: print(f"DEBUG: Setting last_error_details='{res.reason}'"); - instance_data["last_status_code"][domain] = res.status_code - instance_data["last_error_details"][domain] = res.reason + set_instance_data("last_status_code" , domain, res.status_code) + set_instance_data("last_error_details", domain, res.reason) # Running pending updated # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...") @@ -499,12 +533,12 @@ def update_last_error(domain: str, res: any): def update_last_instance_fetch(domain: str): if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: Updating last_instance_fetch for domain:", domain) - instance_data["last_instance_fetch"][domain] = time.time() + set_instance_data("last_instance_fetch", domain, time.time()) # Running pending updated # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...") @@ -514,13 +548,13 @@ def update_last_instance_fetch(domain: str): def update_last_nodeinfo(domain: str): if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain) - instance_data["last_nodeinfo"][domain] = time.time() - instance_data["last_updated"][domain] = time.time() + set_instance_data("last_nodeinfo", domain, time.time()) + set_instance_data("last_updated" , domain, time.time()) # Running pending updated # DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...") @@ -530,11 +564,11 @@ def update_last_nodeinfo(domain: str): def get_peers(domain: str, software: str) -> list: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") elif type(software) != str and software != None: - raise ValueError(f"WARNING: software[]={type(software)} is not 'str'") + raise ValueError(f"software[]={type(software)} is not 'str'") # DEBUG: print(f"DEBUG: domain='{domain}',software='{software}' - CALLED!") peers = list() @@ -597,7 +631,7 @@ def get_peers(domain: str, software: str) -> list: peers.append(row["host"]) # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'") - instance_data["total_peers"][domain] = len(peers) + set_instance_data("total_peers", domain, len(peers)) # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...") update_last_instance_fetch(domain) @@ -629,7 +663,7 @@ def get_peers(domain: str, software: str) -> list: print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(e)}]:'{str(e)}'") # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'") - instance_data["total_peers"][domain] = len(peers) + set_instance_data("total_peers", domain, len(peers)) # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...") update_last_instance_fetch(domain) @@ -671,7 +705,7 @@ def get_peers(domain: str, software: str) -> list: print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(e)}]:'{str(e)}'") # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'") - instance_data["total_peers"][domain] = len(peers) + set_instance_data("total_peers", domain, len(peers)) # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...") update_last_instance_fetch(domain) @@ -713,7 +747,7 @@ def get_peers(domain: str, software: str) -> list: update_last_error(domain, e) # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'") - instance_data["total_peers"][domain] = len(peers) + set_instance_data("total_peers", domain, len(peers)) # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...") update_last_instance_fetch(domain) @@ -723,15 +757,15 @@ def get_peers(domain: str, software: str) -> list: def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") elif type(path) != str: - raise ValueError(f"WARNING: path[]={type(path)} is not 'str'") + raise ValueError(f"path[]={type(path)} is not 'str'") elif path == "": - raise ValueError(f"WARNING: path cannot be empty") + raise ValueError(f"path cannot be empty") elif type(parameter) != str: - raise ValueError(f"WARNING: parameter[]={type(parameter)} is not 'str'") + raise ValueError(f"parameter[]={type(parameter)} is not 'str'") # DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers) data = {} @@ -752,9 +786,9 @@ def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = def fetch_nodeinfo(domain: str, path: str = None) -> list: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: Fetching nodeinfo from domain,path:", domain, path) @@ -788,8 +822,8 @@ def fetch_nodeinfo(domain: str, path: str = None) -> list: # DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'") if res.ok and isinstance(data, dict): # DEBUG: print("DEBUG: Success:", request) - instance_data["detection_mode"][domain] = "STATIC_CHECK" - instance_data["nodeinfo_url"][domain] = request + set_instance_data("detection_mode", domain, "STATIC_CHECK") + set_instance_data("nodeinfo_url" , domain, request) break elif res.ok and isinstance(data, list): # DEBUG: print(f"DEBUG: domain='{domain}' returned a list: '{data}'") @@ -809,9 +843,9 @@ def fetch_nodeinfo(domain: str, path: str = None) -> list: def fetch_wellknown_nodeinfo(domain: str) -> list: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain) data = {} @@ -836,8 +870,8 @@ def fetch_wellknown_nodeinfo(domain: str) -> list: # DEBUG: print("DEBUG: href,res.ok,res.status_code:", link["href"], res.ok, res.status_code) if res.ok and isinstance(data, dict): # DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data)) - instance_data["detection_mode"][domain] = "AUTO_DISCOVERY" - instance_data["nodeinfo_url"][domain] = link["href"] + set_instance_data("detection_mode", domain, "AUTO_DISCOVERY") + set_instance_data("nodeinfo_url" , domain, link["href"]) break else: print("WARNING: Unknown 'rel' value:", domain, link["rel"]) @@ -854,13 +888,13 @@ def fetch_wellknown_nodeinfo(domain: str) -> list: def fetch_generator_from_path(domain: str, path: str = "/") -> str: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") elif type(path) != str: - raise ValueError(f"WARNING: path[]={type(path)} is not 'str'") + raise ValueError(f"path[]={type(path)} is not 'str'") elif path == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!") software = None @@ -883,13 +917,13 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: # DEBUG: print("DEBUG: Found generator meta tag:", domain) software = tidyup(generator.get("content")) print(f"INFO: domain='{domain}' is generated by '{software}'") - instance_data["detection_mode"][domain] = "GENERATOR" + set_instance_data("detection_mode", domain, "GENERATOR") remove_pending_error(domain) elif isinstance(site_name, bs4.element.Tag): # DEBUG: print("DEBUG: Found property=og:site_name:", domain) sofware = tidyup(site_name.get("content")) print(f"INFO: domain='{domain}' has og:site_name='{software}'") - instance_data["detection_mode"][domain] = "SITE_NAME" + set_instance_data("detection_mode", domain, "SITE_NAME") remove_pending_error(domain) except BaseException as e: @@ -924,9 +958,9 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: def determine_software(domain: str, path: str = None) -> str: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: Determining software for domain,path:", domain, path) software = None @@ -1081,9 +1115,9 @@ def block_instance(blocker: str, blocked: str, reason: str, block_level: str): def is_instance_registered(domain: str) -> bool: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # NOISY-DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!") if not is_cache_initialized("is_registered"): @@ -1105,23 +1139,21 @@ def is_instance_registered(domain: str) -> bool: def add_instance(domain: str, origin: str, originator: str, path: str = None): if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") elif type(origin) != str and origin != None: - raise ValueError(f"WARNING: origin[]={type(origin)} is not 'str'") + raise ValueError(f"origin[]={type(origin)} is not 'str'") elif type(originator) != str: - raise ValueError(f"WARNING: originator[]={type(originator)} is not 'str'") + raise ValueError(f"originator[]={type(originator)} is not 'str'") elif originator == "": - raise ValueError(f"WARNING: originator cannot be empty") + raise ValueError(f"originator cannot be empty") # DEBUG: print("DEBUG: domain,origin,originator,path:", domain, origin, originator, path) if not validators.domain(domain.split("/")[0]): - print("WARNING: Bad domain name:", domain) - raise + raise ValueError(f"Bad domain name='{domain}'") elif origin is not None and not validators.domain(origin.split("/")[0]): - print("WARNING: Bad origin name:", origin) - raise + raise ValueError(f"Bad origin name='{origin}'") software = determine_software(domain, path) # DEBUG: print("DEBUG: Determined software:", software) @@ -1198,9 +1230,9 @@ def send_bot_post(instance: str, blocks: dict): def get_mastodon_blocks(domain: str) -> dict: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain) blocks = { @@ -1247,9 +1279,9 @@ def get_mastodon_blocks(domain: str) -> dict: def get_friendica_blocks(domain: str) -> dict: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain) blocks = [] @@ -1268,7 +1300,7 @@ def get_friendica_blocks(domain: str) -> dict: # Prevents exceptions: if blocklist is None: - # DEBUG: print("DEBUG:Instance has no block list:", domain) + # DEBUG: print("DEBUG: Instance has no block list:", domain) return {} for line in blocklist.find("table").find_all("tr")[1:]: @@ -1285,9 +1317,9 @@ def get_friendica_blocks(domain: str) -> dict: def get_misskey_blocks(domain: str) -> dict: if type(domain) != str: - raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") elif domain == "": - raise ValueError(f"WARNING: domain cannot be empty") + raise ValueError(f"Parameter 'domain' cannot be empty") # DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain) blocks = { @@ -1405,7 +1437,7 @@ def get_misskey_blocks(domain: str) -> dict: def tidyup(string: str) -> str: if type(string) != str: - raise ValueError(f"WARNING: string[]={type(string)} is not expected") + raise ValueError(f"Parameter string[]={type(string)} is not expected") # some retards put their blocks in variable case string = string.lower().strip() -- 2.39.5