# written to database. Both arrays must be filled at the same time or else
# update_data() will fail
_pending = {
- # Detection mode: 'AUTO_DISCOVERY', 'STATIC_CHECKS' or 'GENERATOR'
+ # Detection mode
# NULL means all detection methods have failed (maybe still reachable instance)
"detection_mode" : {},
# Found nodeinfo URL
"last_status_code" : {},
# Last error details
"last_error_details" : {},
+ # Wether obfuscation has been used
+ "has_obfuscation" : {},
}
def _set_data(key: str, domain: str, value: any):
logger.debug("EXIT!")
def has_pending(domain: str) -> bool:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
has = False
return has
def update_data(domain: str):
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
if not has_pending(domain):
raise Exception(f"domain='{domain}' has no pending instance data, but function invoked")
def add(domain: str, origin: str, command: str, path: str = None, software: str = None):
logger.debug("domain='%s',origin='%s',command='%s',path='%s',software='%s' - CALLED!", domain, origin, command, path, software)
domain_helper.raise_on(domain)
+
if not isinstance(origin, str) and origin is not None:
raise ValueError(f"origin[]='{type(origin)}' is not 'str'")
elif origin == "":
raise ValueError(f"Bad origin name='{origin}'")
elif blacklist.is_blacklisted(domain):
raise Exception(f"domain='{domain}' is blacklisted, but method invoked")
- elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (software == "lemmy" and domain.find("/c/") > 0):
+ elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
raise Exception(f"domain='{domain}' is a single user")
+ elif domain.find("/tag/") > 0:
+ raise Exception(f"domain='{domain}' is a tag")
if software is None:
try:
logger.debug("Marking domain='%s' as registered.", domain)
cache.set_sub_key("is_registered", domain, True)
+ logger.debug("Checking if domain='%s' has pending updates ...", domain)
if has_pending(domain):
- logger.debug("domain='%s' has pending nodeinfo being updated ...", domain)
+ logger.debug("Flushing updates for domain='%s' ...", domain)
update_data(domain)
logger.debug("EXIT!")
def set_last_nodeinfo(domain: str):
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
- logger.debug("Updating last_nodeinfo for domain:", domain)
+ logger.debug("Updating last_nodeinfo for domain='%s'", domain)
_set_data("last_nodeinfo", domain, time.time())
- # Running pending updated
- logger.debug("Invoking update_data(%s) ...", domain)
- update_data(domain)
-
logger.debug("EXIT!")
def set_last_error(domain: str, error: dict):
logger.debug("EXIT!")
+def set_success(domain: str):
+ logger.debug("domain='%s' - CALLED!", domain)
+ domain_helper.raise_on(domain)
+
+ # Set both to success
+ _set_data("last_status_code" , domain, 200)
+ _set_data("last_error_details", domain, None)
+
+ logger.debug("EXIT!")
+
def is_registered(domain: str) -> bool:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
if not cache.key_exists("is_registered"):
logger.debug("Cache for 'is_registered' not initialized, fetching all rows ...")
database.cursor.execute("SELECT domain FROM instances")
logger.debug("registered='%s' - EXIT!", registered)
return registered
-def is_recent(domain: str) -> bool:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+def is_recent(domain: str, column: str = "last_instance_fetch") -> bool:
+ logger.debug("domain='%s',column='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
- if not is_registered(domain):
- logger.debug(f"domain='{domain}' is not registered, returning False - EXIT!")
+
+ if not isinstance(column, str):
+ raise ValueError(f"Parameter column[]='{type(column)}' is not 'str'")
+ elif column not in ["last_instance_fetch", "last_blocked"]:
+ raise ValueError(f"Parameter column='{column}' is not expected")
+ elif not is_registered(domain):
+ logger.debug("domain='%s' is not registered, returning False - EXIT!", domain)
return False
# Query database
- database.cursor.execute("SELECT last_instance_fetch FROM instances WHERE domain = ? LIMIT 1", [domain])
+ database.cursor.execute(f"SELECT {column} FROM instances WHERE domain = ? LIMIT 1", [domain])
# Fetch row
fetched = database.cursor.fetchone()[0]
logger.debug("recently='%s' - EXIT!", recently)
return recently
-def deobscure(char: str, domain: str, blocked_hash: str = None) -> tuple:
+def deobfuscate(char: str, domain: str, blocked_hash: str = None) -> tuple:
logger.debug("char='%s',domain='%s',blocked_hash='%s' - CALLED!", char, domain, blocked_hash)
if not isinstance(domain, str):
if row is None:
logger.debug("blocked_hash='%s' not found, trying domain='%s' ...", blocked_hash, domain)
- return deobscure(char, domain)
+ return deobfuscate(char, domain)
else:
logger.debug("Looking up domain='%s' ...", domain)
database.cursor.execute(
return row
def set_last_blocked(domain: str):
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
# Set timestamp
logger.debug("EXIT!")
def set_last_instance_fetch(domain: str):
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
# Set timestamp
logger.debug("EXIT!")
def set_total_peers(domain: str, peers: list):
- logger.debug(f"domain='{domain}',peers()={len(peers)} - CALLED!")
+ logger.debug("domain='%s',peers()=%d - CALLED!", domain, len(peers))
domain_helper.raise_on(domain)
+
if not isinstance(peers, list):
- raise ValueError(f"Parameter peers[]='{type(peers)}' is not 'list'")
+ raise ValueError(f"Parameter peers[]='{type(peers)}' is not 'list': '%s'")
# Set timestamp
_set_data("total_peers", domain, len(peers))
logger.debug("EXIT!")
def set_nodeinfo_url(domain: str, url: str):
- logger.debug(f"domain='{domain}',url='{url}' - CALLED!")
+ logger.debug("domain='%s',url='%s' - CALLED!", domain, url)
domain_helper.raise_on(domain)
+
if not isinstance(url, str):
raise ValueError("Parameter url[]='{type(url)}' is not 'list'")
elif url == "":
logger.debug("EXIT!")
def set_detection_mode(domain: str, mode: str):
- logger.debug(f"domain='{domain}',mode='{mode}' - CALLED!")
+ logger.debug("domain='%s',mode='%s' - CALLED!", domain, mode)
domain_helper.raise_on(domain)
+
if not isinstance(mode, str):
raise ValueError("Parameter mode[]='{type(mode)}' is not 'list'")
elif mode == "":
# Set timestamp
_set_data("detection_mode", domain, mode)
logger.debug("EXIT!")
+
+def set_has_obfuscation(domain: str, status: bool):
+ logger.debug("domain(%d)='%s',status='%s' - CALLED!", len(domain), domain, status)
+ domain_helper.raise_on(domain)
+
+ if not isinstance(status, bool):
+ raise ValueError(f"Parameter status[]='{type(status)}' is not 'bool'")
+
+ # Set timestamp
+ _set_data("has_obfuscation", domain, status)
+ logger.debug("EXIT!")