def set_all_cache_key(key: str, rows: list, value: any):
# NOISY-DEBUG: print(f"DEBUG: key='{key}',rows()={len(rows)},value[]={type(value)} - CALLED!")
- if not is_cache_initialized(key):
+ if type(key) != str:
+ raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
+ elif not is_cache_initialized(key):
# NOISY-DEBUG: print(f"DEBUG: Cache for key='{key}' not initialized.")
cache[key] = {}
# NOISY-DEBUG: print("DEBUG: EXIT!")
def set_cache_key(key: str, sub: str, value: any):
- if not is_cache_initialized(key):
+ if type(key) != str:
+ raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
+ elif type(sub) != str:
+ raise ValueError("Parameter sub[]='{type(sub)}' is not 'str'")
+ elif not is_cache_initialized(key):
print(f"WARNING: Bad method call, key='{key}' is not initialized yet.")
raise Exception(f"Cache for key='{key}' is not initialized, but function called")
cache[key][sub] = value
def is_cache_key_set(key: str, sub: str) -> bool:
- if not is_cache_initialized(key):
+ if type(key) != str:
+ raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
+ elif type(sub) != str:
+ raise ValueError("Parameter sub[]='{type(sub)}' is not 'str'")
+ elif not is_cache_initialized(key):
print(f"WARNING: Bad method call, key='{key}' is not initialized yet.")
raise Exception(f"Cache for key='{key}' is not initialized, but function called")
return sub in cache[key]
+##### Other functions #####
+
+def is_primitive(var: any) -> bool:
+ #print(f"DEBUG: var[]='{type(var)}' - CALLED!")
+ return type(var) in {int, str, float, bool}
+
+def set_instance_data(key: str, domain: str, value: any):
+ #print(f"DEBUG: key='{key}',domain='{domain}',value[]='{type(value)}' - CALLED!")
+ if type(key) != str:
+ raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
+ elif key == "":
+ raise ValueError(f"Parameter 'key' cannot be empty")
+ elif type(domain) != str:
+ raise ValueError("Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError(f"Parameter 'domain' cannot be empty")
+ elif not key in instance_data:
+ raise ValueError(f"key='{key}' not found in instance_data")
+ elif not is_primitive(value):
+ raise ValueError(f"value[]='{type(value)}' is not a primitive type")
+
+ # Set it
+ instance_data[key][domain] = value
+
def add_peers(rows: dict) -> list:
# DEBUG: print(f"DEBUG: rows()={len(rows)} - CALLED!")
peers = list()
def is_blacklisted(domain: str) -> bool:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
blacklisted = False
for peer in blacklist:
def remove_pending_error(domain: str):
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
try:
# Prevent updating any pending errors, nodeinfo was found
def get_hash(domain: str) -> str:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
return hashlib.sha256(domain.encode("utf-8")).hexdigest()
def update_last_blocked(domain: str):
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
- instance_data["last_blocked"][domain] = time.time()
+ set_instance_data("last_blocked", domain, time.time())
# Running pending updated
# DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
def has_pending_instance_data(domain: str) -> bool:
# DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
has_pending = False
for key in instance_data:
def update_instance_data(domain: str):
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print(f"DEBUG: Updating nodeinfo for domain='{domain}' ...")
sql_string = ''
fields.append(domain)
if sql_string == '':
- raise ValueError(f"WARNING: No fields have been set, but method invoked, domain='{domain}'")
+ raise ValueError(f"No fields have been set, but method invoked, domain='{domain}'")
# DEBUG: print(f"DEBUG: sql_string='{sql_string}',fields()={len(fields)}")
sql = "UPDATE instances SET" + sql_string + " last_status_code = NULL, last_error_details = NULL, last_updated = TIME() WHERE domain = ? LIMIT 1"
def log_error(domain: str, res: any):
# DEBUG: print("DEBUG: domain,res[]:", domain, type(res))
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
try:
# DEBUG: print("DEBUG: BEFORE res[]:", type(res))
def update_last_error(domain: str, res: any):
# DEBUG: print("DEBUG: domain,res[]:", domain, type(res))
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: BEFORE res[]:", type(res))
if isinstance(res, BaseException) or isinstance(res, json.JSONDecodeError):
# DEBUG: print("DEBUG: AFTER res[]:", type(res))
if type(res) is str:
# DEBUG: print(f"DEBUG: Setting last_error_details='{res}'");
- instance_data["last_status_code"][domain] = 999
- instance_data["last_error_details"][domain] = res
+ set_instance_data("last_status_code" , domain, 999)
+ set_instance_data("last_error_details", domain, res)
else:
# DEBUG: print(f"DEBUG: Setting last_error_details='{res.reason}'");
- instance_data["last_status_code"][domain] = res.status_code
- instance_data["last_error_details"][domain] = res.reason
+ set_instance_data("last_status_code" , domain, res.status_code)
+ set_instance_data("last_error_details", domain, res.reason)
# Running pending updated
# DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
def update_last_instance_fetch(domain: str):
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Updating last_instance_fetch for domain:", domain)
- instance_data["last_instance_fetch"][domain] = time.time()
+ set_instance_data("last_instance_fetch", domain, time.time())
# Running pending updated
# DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
def update_last_nodeinfo(domain: str):
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
- instance_data["last_nodeinfo"][domain] = time.time()
- instance_data["last_updated"][domain] = time.time()
+ set_instance_data("last_nodeinfo", domain, time.time())
+ set_instance_data("last_updated" , domain, time.time())
# Running pending updated
# DEBUG: print(f"DEBUG: Invoking update_instance_data({domain}) ...")
def get_peers(domain: str, software: str) -> list:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
elif type(software) != str and software != None:
- raise ValueError(f"WARNING: software[]={type(software)} is not 'str'")
+ raise ValueError(f"software[]={type(software)} is not 'str'")
# DEBUG: print(f"DEBUG: domain='{domain}',software='{software}' - CALLED!")
peers = list()
peers.append(row["host"])
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instance_data["total_peers"][domain] = len(peers)
+ set_instance_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instance_data["total_peers"][domain] = len(peers)
+ set_instance_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(e)}]:'{str(e)}'")
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instance_data["total_peers"][domain] = len(peers)
+ set_instance_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
update_last_error(domain, e)
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instance_data["total_peers"][domain] = len(peers)
+ set_instance_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
update_last_instance_fetch(domain)
def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
elif type(path) != str:
- raise ValueError(f"WARNING: path[]={type(path)} is not 'str'")
+ raise ValueError(f"path[]={type(path)} is not 'str'")
elif path == "":
- raise ValueError(f"WARNING: path cannot be empty")
+ raise ValueError(f"path cannot be empty")
elif type(parameter) != str:
- raise ValueError(f"WARNING: parameter[]={type(parameter)} is not 'str'")
+ raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
# DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
data = {}
def fetch_nodeinfo(domain: str, path: str = None) -> list:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Fetching nodeinfo from domain,path:", domain, path)
# DEBUG: print(f"DEBUG: res.ok={res.ok},res.status_code={res.status_code},data[]='{type(data)}'")
if res.ok and isinstance(data, dict):
# DEBUG: print("DEBUG: Success:", request)
- instance_data["detection_mode"][domain] = "STATIC_CHECK"
- instance_data["nodeinfo_url"][domain] = request
+ set_instance_data("detection_mode", domain, "STATIC_CHECK")
+ set_instance_data("nodeinfo_url" , domain, request)
break
elif res.ok and isinstance(data, list):
# DEBUG: print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
def fetch_wellknown_nodeinfo(domain: str) -> list:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain)
data = {}
# DEBUG: print("DEBUG: href,res.ok,res.status_code:", link["href"], res.ok, res.status_code)
if res.ok and isinstance(data, dict):
# DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data))
- instance_data["detection_mode"][domain] = "AUTO_DISCOVERY"
- instance_data["nodeinfo_url"][domain] = link["href"]
+ set_instance_data("detection_mode", domain, "AUTO_DISCOVERY")
+ set_instance_data("nodeinfo_url" , domain, link["href"])
break
else:
print("WARNING: Unknown 'rel' value:", domain, link["rel"])
def fetch_generator_from_path(domain: str, path: str = "/") -> str:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
elif type(path) != str:
- raise ValueError(f"WARNING: path[]={type(path)} is not 'str'")
+ raise ValueError(f"path[]={type(path)} is not 'str'")
elif path == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!")
software = None
# DEBUG: print("DEBUG: Found generator meta tag:", domain)
software = tidyup(generator.get("content"))
print(f"INFO: domain='{domain}' is generated by '{software}'")
- instance_data["detection_mode"][domain] = "GENERATOR"
+ set_instance_data("detection_mode", domain, "GENERATOR")
remove_pending_error(domain)
elif isinstance(site_name, bs4.element.Tag):
# DEBUG: print("DEBUG: Found property=og:site_name:", domain)
sofware = tidyup(site_name.get("content"))
print(f"INFO: domain='{domain}' has og:site_name='{software}'")
- instance_data["detection_mode"][domain] = "SITE_NAME"
+ set_instance_data("detection_mode", domain, "SITE_NAME")
remove_pending_error(domain)
except BaseException as e:
def determine_software(domain: str, path: str = None) -> str:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Determining software for domain,path:", domain, path)
software = None
def is_instance_registered(domain: str) -> bool:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# NOISY-DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
if not is_cache_initialized("is_registered"):
def add_instance(domain: str, origin: str, originator: str, path: str = None):
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
elif type(origin) != str and origin != None:
- raise ValueError(f"WARNING: origin[]={type(origin)} is not 'str'")
+ raise ValueError(f"origin[]={type(origin)} is not 'str'")
elif type(originator) != str:
- raise ValueError(f"WARNING: originator[]={type(originator)} is not 'str'")
+ raise ValueError(f"originator[]={type(originator)} is not 'str'")
elif originator == "":
- raise ValueError(f"WARNING: originator cannot be empty")
+ raise ValueError(f"originator cannot be empty")
# DEBUG: print("DEBUG: domain,origin,originator,path:", domain, origin, originator, path)
if not validators.domain(domain.split("/")[0]):
- print("WARNING: Bad domain name:", domain)
- raise
+ raise ValueError(f"Bad domain name='{domain}'")
elif origin is not None and not validators.domain(origin.split("/")[0]):
- print("WARNING: Bad origin name:", origin)
- raise
+ raise ValueError(f"Bad origin name='{origin}'")
software = determine_software(domain, path)
# DEBUG: print("DEBUG: Determined software:", software)
def get_mastodon_blocks(domain: str) -> dict:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
blocks = {
def get_friendica_blocks(domain: str) -> dict:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
blocks = []
# Prevents exceptions:
if blocklist is None:
- # DEBUG: print("DEBUG:Instance has no block list:", domain)
+ # DEBUG: print("DEBUG: Instance has no block list:", domain)
return {}
for line in blocklist.find("table").find_all("tr")[1:]:
def get_misskey_blocks(domain: str) -> dict:
if type(domain) != str:
- raise ValueError(f"WARNING: domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
- raise ValueError(f"WARNING: domain cannot be empty")
+ raise ValueError(f"Parameter 'domain' cannot be empty")
# DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
blocks = {
def tidyup(string: str) -> str:
if type(string) != str:
- raise ValueError(f"WARNING: string[]={type(string)} is not expected")
+ raise ValueError(f"Parameter string[]={type(string)} is not expected")
# some retards put their blocks in variable case
string = string.lower().strip()