# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
+import logging
import json
-import sys
import time
import requests
import validators
-from fba import blacklist
-from fba import config
-from fba import fba
-from fba import federation
-from fba import network
+from fba import database
+from fba import utils
+from fba.helpers import blacklist
from fba.helpers import cache
+from fba.helpers import config
+from fba.helpers import domain as domain_helper
+from fba.helpers import tidyup
+
+from fba.http import federation
+from fba.http import network
from fba.models import error_log
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+#logger.setLevel(logging.DEBUG)
+
# Found info from node, such as nodeinfo URL, detection mode that needs to be
# written to database. Both arrays must be filled at the same time or else
-# update_data() will fail
+# update() will fail
_pending = {
- # Detection mode: 'AUTO_DISCOVERY', 'STATIC_CHECKS' or 'GENERATOR'
+ # Detection mode
# NULL means all detection methods have failed (maybe still reachable instance)
"detection_mode" : {},
# Found nodeinfo URL
"nodeinfo_url" : {},
# Found total peers
"total_peers" : {},
+ # Found total blocks
+ "total_blocks" : {},
+ # Obfuscated domains
+ "obfuscated_blocks" : {},
# Last fetched instances
"last_instance_fetch": {},
# Last updated
"last_status_code" : {},
# Last error details
"last_error_details" : {},
+ # Wether obfuscation has been used
+ "has_obfuscation" : {},
+ # Determined software
+ "software" : {},
}
def _set_data(key: str, domain: str, value: any):
- # DEBUG: print(f"DEBUG: key='{key}',domain='{domain}',value[]='{type(value)}' - CALLED!")
+ logger.debug("key='%s',domain='%s',value[]='%s' - CALLED!", key, domain, type(value))
+ domain_helper.raise_on(domain)
if not isinstance(key, str):
- raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
+ raise ValueError(f"Parameter key[]='{type(key)}' is not of type 'str'")
elif key == "":
raise ValueError("Parameter 'key' is empty")
- elif not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
elif not key in _pending:
raise ValueError(f"key='{key}' not found in _pending")
- elif not fba.is_primitive(value):
+ elif not utils.is_primitive(value):
raise ValueError(f"value[]='{type(value)}' is not a primitive type")
# Set it
_pending[key][domain] = value
- # DEBUG: print("DEBUG: EXIT!")
+ logger.debug("EXIT!")
def has_pending(domain: str) -> bool:
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
+ logger.debug("domain='%s' - CALLED!", domain)
+ domain_helper.raise_on(domain)
has = False
for key in _pending:
- # DEBUG: print(f"DEBUG: key='{key}',domain='{domain}',_pending[key]()='{len(_pending[key])}'")
+ logger.debug("key='%s',domain='%s',_pending[key]()=%d", key, domain, len(_pending[key]))
if domain in _pending[key]:
has = True
break
- # DEBUG: print(f"DEBUG: has='{has}' - EXIT!")
+ logger.debug("has='%s' - EXIT!", has)
return has
-def update_data(domain: str):
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif not has_pending(domain):
+def update(domain: str):
+ logger.debug("domain='%s' - CALLED!", domain)
+ domain_helper.raise_on(domain)
+ if not has_pending(domain):
raise Exception(f"domain='{domain}' has no pending instance data, but function invoked")
+ elif not is_registered(domain):
+ raise Exception(f"domain='{domain}' cannot be updated while not being registered")
- # DEBUG: print(f"DEBUG: Updating instance data for domain='{domain}' ...")
+ logger.debug("Updating instance data for domain='%s' ...", domain)
sql_string = ""
fields = list()
for key in _pending:
- # DEBUG: print("DEBUG: key:", key)
+ logger.debug("Checking key='%s',domain='%s'", key, domain)
if domain in _pending[key]:
- # DEBUG: print(f"DEBUG: Adding '{_pending[key][domain]}' for key='{key}' ...")
+ logger.debug("Adding '%s' for key='%s' ...", _pending[key][domain], key)
fields.append(_pending[key][domain])
sql_string += f" {key} = ?,"
- # DEBUG: print(f"DEBUG: sql_string()={len(sql_string)}")
+ logger.debug("sql_string(%d)='%s'", len(sql_string), sql_string)
if sql_string == "":
- raise ValueError(f"No fields have been set, but method invoked, domain='{domain}'")
+ raise ValueError(f"No fields have been set, but function invoked, domain='{domain}'")
# Set last_updated to current timestamp
fields.append(time.time())
# For WHERE statement
+ logger.debug("Setting domain='%s' for WHERE statement ...", domain)
fields.append(domain)
- # DEBUG: print(f"DEBUG: sql_string='{sql_string}',fields()={len(fields)}")
+ logger.debug("sql_string='%s',fields()=%d", sql_string, len(fields))
sql_string = "UPDATE instances SET" + sql_string + " last_updated = ? WHERE domain = ? LIMIT 1"
- # DEBUG: print("DEBUG: sql_string:", sql_string)
- try:
- # DEBUG: print("DEBUG: Executing SQL:", sql_string)
- fba.cursor.execute(sql_string, fields)
+ logger.debug("Executing SQL: sql_string='%s',fields()=%d", sql_string, len(fields))
+ database.cursor.execute(sql_string, fields)
- # DEBUG: print(f"DEBUG: Success! (rowcount={fba.cursor.rowcount })")
- if fba.cursor.rowcount == 0:
- # DEBUG: print(f"DEBUG: Did not update any rows: domain='{domain}',fields()={len(fields)} - EXIT!")
- return
+ logger.debug("rowcount=%d", database.cursor.rowcount)
+ if database.cursor.rowcount == 0:
+ raise Exception(f"Did not update any rows: domain='{domain}',fields()={len(fields)}")
- # DEBUG: print("DEBUG: Committing changes ...")
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
- # DEBUG: print(f"DEBUG: Deleting _pending for domain='{domain}'")
- for key in _pending:
- # DEBUG: print(f"DEBUG: domain='{domain}',key='{key}'")
- if domain in _pending[key]:
- del _pending[key][domain]
-
- except BaseException as exception:
- print(f"ERROR: failed SQL query: domain='{domain}',sql_string='{sql_string}',exception[{type(exception)}]:'{str(exception)}'")
- sys.exit(255)
+ logger.debug("Deleting _pending for domain='%s'", domain)
+ for key in _pending:
+ logger.debug("domain='%s',key='%s'", domain, key)
+ if domain in _pending[key]:
+ logger.debug("Deleting key='%s',domain='%s' ...", key, domain)
+ del _pending[key][domain]
- # DEBUG: print("DEBUG: EXIT!")
+ logger.debug("EXIT!")
def add(domain: str, origin: str, command: str, path: str = None, software: str = None):
- # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',command='{command}',path='{path}',software='{software}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif not isinstance(origin, str) and origin is not None:
- raise ValueError(f"origin[]='{type(origin)}' is not 'str'")
+ logger.debug("domain='%s',origin='%s',command='%s',path='%s',software='%s' - CALLED!", domain, origin, command, path, software)
+ domain_helper.raise_on(domain)
+
+ if not isinstance(origin, str) and origin is not None:
+ raise ValueError(f"origin[]='{type(origin)}' is not of type 'str'")
elif origin == "":
raise ValueError("Parameter 'origin' is empty")
elif not isinstance(command, str):
- raise ValueError(f"command[]='{type(command)}' is not 'str'")
+ raise ValueError(f"command[]='{type(command)}' is not of type 'str'")
elif command == "":
raise ValueError("Parameter 'command' is empty")
- elif not validators.domain(domain.split("/")[0]):
- raise ValueError(f"Bad domain name='{domain}'")
elif not isinstance(path, str) and path is not None:
- raise ValueError(f"path[]='{type(path)}' is not 'str'")
+ raise ValueError(f"path[]='{type(path)}' is not of type 'str'")
elif path == "":
raise ValueError("Parameter 'path' is empty")
elif not isinstance(software, str) and software is not None:
- raise ValueError(f"software[]='{type(software)}' is not 'str'")
+ raise ValueError(f"software[]='{type(software)}' is not of type 'str'")
elif software == "":
raise ValueError("Parameter 'software' is empty")
- elif domain.endswith(".arpa"):
- raise ValueError(f"Please don't crawl .arpa domains: domain='{domain}'")
elif origin is not None and not validators.domain(origin.split("/")[0]):
raise ValueError(f"Bad origin name='{origin}'")
elif blacklist.is_blacklisted(domain):
- raise Exception(f"domain='{domain}' is blacklisted, but method invoked")
- elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (software == "lemmy" and domain.find("/c/") > 0):
+ raise Exception(f"domain='{domain}' is blacklisted, but function invoked")
+ elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
raise Exception(f"domain='{domain}' is a single user")
+ elif domain.find("/tag/") > 0:
+ raise Exception(f"domain='{domain}' is a tag")
if software is None:
try:
- # DEBUG: print("DEBUG: domain,origin,command,path:", domain, origin, command, path)
+ logger.debug("domain='%s',origin='%s',command='%s',path='%s'", domain, origin, command, path)
software = federation.determine_software(domain, path)
except network.exceptions as exception:
- print(f"WARNING Exception '{type(exception)}' during determining software type")
+ logger.warning("Exception '%s' during determining software type, domain='%s'", type(exception), domain)
+ set_last_error(domain, exception)
- # DEBUG: print("DEBUG: Determined software:", software)
+ logger.debug("Determined software='%s'", software)
if software == "lemmy" and domain.find("/c/") > 0:
domain = domain.split("/c/")[0]
if is_registered(domain):
- print(f"WARNING: domain='{domain}' already registered after cutting off user part. - EXIT!")
+ logger.warning("domain='%s' already registered after cutting off user part. - EXIT!", domain)
return
- print(f"INFO: Adding instance domain='{domain}' (origin='{origin}',software='{software}')")
- fba.cursor.execute(
+ logger.info("Adding instance domain='%s',origin='%s',software='%s',command='%s'", domain, origin, software, command)
+ database.cursor.execute(
"INSERT INTO instances (domain, origin, command, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
(
domain,
origin,
command,
- fba.get_hash(domain),
+ utils.get_hash(domain),
software,
time.time()
),
)
+ logger.debug("Marking domain='%s' as registered.", domain)
cache.set_sub_key("is_registered", domain, True)
+ logger.debug("Checking if domain='%s' has pending updates ...", domain)
if has_pending(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' has pending nodeinfo being updated ...")
- _set_data("last_status_code" , domain, None)
- _set_data("last_error_details", domain, None)
- update_data(domain)
+ logger.debug("Flushing updates for domain='%s' ...", domain)
+ update(domain)
- # DEBUG: print("DEBUG: EXIT!")
+ logger.debug("EXIT!")
def set_last_nodeinfo(domain: str):
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
+ logger.debug("domain='%s' - CALLED!", domain)
+ domain_helper.raise_on(domain)
- # DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
+ logger.debug("Updating last_nodeinfo for domain='%s'", domain)
_set_data("last_nodeinfo", domain, time.time())
- _set_data("last_updated" , domain, time.time())
-
- # Running pending updated
- # DEBUG: print(f"DEBUG: Invoking update_data({domain}) ...")
- update_data(domain)
- # DEBUG: print("DEBUG: EXIT!")
+ logger.debug("EXIT!")
def set_last_error(domain: str, error: dict):
- # DEBUG: print("DEBUG: domain,error[]:", domain, type(error))
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
+ logger.debug("domain='%s',error[]='%s' - CALLED!", domain, type(error))
+ domain_helper.raise_on(domain)
- # DEBUG: print("DEBUG: BEFORE error[]:", type(error))
+ logger.debug("error[]='%s' - BEFORE!", type(error))
if isinstance(error, (BaseException, json.decoder.JSONDecodeError)):
error = f"error[{type(error)}]='{str(error)}'"
- # DEBUG: print("DEBUG: AFTER error[]:", type(error))
+ logger.debug("error[]='%s' - AFTER!", type(error))
if isinstance(error, str):
- # DEBUG: print(f"DEBUG: Setting last_error_details='{error}'")
+ logger.debug("Setting last_error_details='%s' (str)", error)
_set_data("last_status_code" , domain, 999)
_set_data("last_error_details", domain, error if error != "" else None)
elif isinstance(error, requests.models.Response):
- # DEBUG: print(f"DEBUG: Setting last_error_details='{error.reason}'")
+ logger.debug("Setting last_error_details='%s' (Response)", error.reason)
_set_data("last_status_code" , domain, error.status_code)
_set_data("last_error_details", domain, error.reason if error.reason != "" else None)
elif not isinstance(error, dict):
raise KeyError(f"Cannot handle keys in error[{type(error)}]='{error}'")
elif "status_code" in error and "error_message" in error:
- # DEBUG: print(f"DEBUG: Setting last_error_details='{error['error_message']}'")
+ logger.debug("Setting last_error_details='%s' (error_message)", error['error_message'])
_set_data("last_status_code" , domain, error["status_code"])
_set_data("last_error_details", domain, error["error_message"] if error["error_message"] != "" else None)
elif "json" in error and "error" in error["json"]:
+ logger.debug("Setting last_error_details='%s' (json,error)", error["json"]["error"])
_set_data("last_status_code" , domain, error["status_code"])
_set_data("last_error_details", domain, error["json"]["error"] if error["json"]["error"] != "" else None)
- # Running pending updated
- # DEBUG: print(f"DEBUG: Invoking update_data({domain}) ...")
- update_data(domain)
-
+ logger.debug("Invoking error_log.add(domain='%s',error[]='%s'", domain, type(error))
error_log.add(domain, error)
- # DEBUG: print("DEBUG: EXIT!")
-def is_registered(domain: str) -> bool:
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
+ logger.debug("EXIT!")
+
+def set_success(domain: str):
+ logger.debug("domain='%s' - CALLED!", domain)
+ domain_helper.raise_on(domain)
+
+ # Set both to success
+ _set_data("last_status_code" , domain, 200)
+ _set_data("last_error_details", domain, None)
+
+ logger.debug("EXIT!")
+
+def is_registered(domain: str, skip_raise = False) -> bool:
+ logger.debug("domain='%s',skip_raise='%s' - CALLED!", domain, skip_raise)
+ if not isinstance(skip_raise, bool):
+ raise ValueError(f"skip_raise[]='{type(skip_raise)}' is not type of 'bool'")
+
+ if not skip_raise:
+ domain_helper.raise_on(domain)
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
+ logger.debug("domain='%s' - CALLED!", domain)
if not cache.key_exists("is_registered"):
- # DEBUG: print("DEBUG: Cache for 'is_registered' not initialized, fetching all rows ...")
- fba.cursor.execute("SELECT domain FROM instances")
+ logger.debug("Cache for 'is_registered' not initialized, fetching all rows ...")
+ database.cursor.execute("SELECT domain FROM instances")
# Check Set all
- cache.set_all("is_registered", fba.cursor.fetchall(), True)
+ cache.set_all("is_registered", database.cursor.fetchall(), True)
# Is cache found?
registered = cache.sub_key_exists("is_registered", domain)
- # DEBUG: print(f"DEBUG: registered='{registered}' - EXIT!")
+ logger.debug("registered='%s' - EXIT!", registered)
return registered
-def is_recent(domain: str) -> bool:
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
+def is_recent(domain: str, column: str = "last_instance_fetch") -> bool:
+ logger.debug("domain='%s',column='%s' - CALLED!", domain, column)
+ domain_helper.raise_on(domain)
+
+ if not isinstance(column, str):
+ raise ValueError(f"Parameter column[]='{type(column)}' is not of type 'str'")
+ elif not column.startswith("last_"):
+ raise ValueError(f"Parameter column='{column}' is not expected")
elif not is_registered(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is not registered, returning False - EXIT!")
+ logger.debug("domain='%s' is not registered, returning False - EXIT!", domain)
return False
+ key = "recheck_instance"
+ if column == "last_blocked":
+ key = "recheck_block"
+
# Query database
- fba.cursor.execute("SELECT last_instance_fetch FROM instances WHERE domain = ? LIMIT 1", [domain])
+ database.cursor.execute(f"SELECT {column} FROM instances WHERE domain = ? LIMIT 1", [domain])
# Fetch row
- fetched = fba.cursor.fetchone()[0]
+ row = database.cursor.fetchone()
+
+ fetched = float(row[column]) if row[column] is not None else 0.0
- # DEBUG: print(f"DEBUG: fetched[{type(fetched)}]='{fetched}'")
- recently = isinstance(fetched, float) and time.time() - fetched <= config.get("recheck_instance")
+ diff = (time.time() - fetched)
- # DEBUG: print(f"DEBUG: recently='{recently}' - EXIT!")
+ logger.debug("fetched[%s]='%s',key='%s',diff=%f", type(fetched), fetched, key, diff)
+ recently = bool(diff < config.get(key))
+
+ logger.debug("recently='%s' - EXIT!", recently)
return recently
-def deobscure(char: str, domain: str, blocked_hash: str = None) -> tuple:
- # DEBUG: print(f"DEBUG: char='{char}',domain='{domain}',blocked_hash='{blocked_hash}' - CALLED!")
+def deobfuscate(char: str, domain: str, blocked_hash: str = None) -> tuple:
+ logger.debug("char='%s',domain='%s',blocked_hash='%s' - CALLED!", char, domain, blocked_hash)
+
if not isinstance(char, str):
- raise ValueError(f"Parameter char[]='{type(char)}' is not 'str'")
+ raise ValueError(f"Parameter char[]='{type(char)}' is not of type 'str'")
elif char == "":
raise ValueError("Parameter 'char' is empty")
+ elif not char in domain:
+ raise ValueError(f"char='{char}' not found in domain='{domain}' but function invoked")
elif not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ raise ValueError(f"Parameter domain[]='{type(domain)}'")
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
elif not isinstance(blocked_hash, str) and blocked_hash is not None:
- raise ValueError(f"Parameter blocked_hash[]='{type(blocked_hash)}' is not 'str'")
+ raise ValueError(f"Parameter blocked_hash[]='{type(blocked_hash)}' is not of type 'str'")
+ # Init row
+ row = None
+
+ logger.debug("blocked_hash[]='%s'", type(blocked_hash))
if isinstance(blocked_hash, str):
- # DEBUG: print(f"DEBUG: Looking up blocked_hash='{blocked_hash}' ...")
- fba.cursor.execute(
- "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? LIMIT 1", [blocked_hash]
+ logger.debug("Looking up blocked_hash='%s',domain='%s' ...", blocked_hash, domain)
+ database.cursor.execute(
+ "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? OR domain LIKE ? LIMIT 1", [blocked_hash, domain.replace(char, "_")]
)
- row = fba.cursor.fetchone()
- # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+ row = database.cursor.fetchone()
+ logger.debug("row[]='%s'", type(row))
if row is None:
- # DEBUG: print(f"DEBUG: blocked_hash='{blocked_hash}' not found, trying domain='{domain}' ...")
- return deobscure(char, domain)
- else:
- # DEBUG: print(f"DEBUG: Looking up domain='{domain}' ...")
- fba.cursor.execute(
- "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [domain.replace(char, "_")]
+ logger.debug("blocked_hash='%s' not found, trying domain='%s' ...", blocked_hash, domain)
+ return deobfuscate(char, domain)
+ elif not domain.startswith("*."):
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = tidyup.domain(domain)
+ logger.debug("domain='%s' - AFTER!", domain)
+
+ if domain == "":
+ logger.warning("domain is empty after tidyup - EXIT!")
+ return None
+
+ search = domain.replace(char, "_")
+
+ logger.debug("Looking up domain='%s',search='%s' ...", domain, search)
+ database.cursor.execute(
+ "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? OR 'https://' || domain LIKE ? ORDER BY rowid LIMIT 1", [search, search]
)
- row = fba.cursor.fetchone()
- # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
+ row = database.cursor.fetchone()
+ logger.debug("row[]='%s'", type(row))
- # DEBUG: print(f"DEBUG: row[]='{type(row)}' - EXIT!")
+ logger.debug("row[]='%s' - EXIT!", type(row))
return row
-def set_last_blocked (domain: str):
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
+def set_last_blocked(domain: str):
+ logger.debug("domain='%s' - CALLED!", domain)
+ domain_helper.raise_on(domain)
# Set timestamp
_set_data("last_blocked", domain, time.time())
- # DEBUG: print("DEBUG: EXIT!")
+ logger.debug("EXIT!")
-def set_last_instance_fetch (domain: str):
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
+def set_last_instance_fetch(domain: str):
+ logger.debug("domain='%s' - CALLED!", domain)
+ domain_helper.raise_on(domain)
# Set timestamp
_set_data("last_instance_fetch", domain, time.time())
- # DEBUG: print("DEBUG: EXIT!")
+ logger.debug("EXIT!")
-def set_total_peers (domain: str, peers: list):
- # DEBUG: print(f"DEBUG: domain='{domain}',peers()={len(peers)} - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif not isinstance(peers, list):
- raise ValueError("Parameter peers[]='{type(peers)}' is not 'list'")
+def set_total_peers(domain: str, peers: list):
+ logger.debug("domain='%s',peers()=%d - CALLED!", domain, len(peers))
+ domain_helper.raise_on(domain)
+
+ if not isinstance(peers, list):
+ raise ValueError(f"Parameter peers[]='{type(peers)}' is not of type 'list'")
# Set timestamp
_set_data("total_peers", domain, len(peers))
- # DEBUG: print("DEBUG: EXIT!")
+ logger.debug("EXIT!")
-def set_nodeinfo_url (domain: str, url: list):
- # DEBUG: print(f"DEBUG: domain='{domain}',url='{url}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif not isinstance(url, str):
- raise ValueError("Parameter url[]='{type(url)}' is not 'list'")
- elif url == "":
- raise ValueError("Parameter 'url' is empty")
+def set_total_blocks(domain: str, blocks: list):
+ logger.debug("domain='%s',blocks()=%d - CALLED!", domain, len(blocks))
+ domain_helper.raise_on(domain)
+
+ if not isinstance(blocks, list):
+ raise ValueError(f"Parameter blocks[]='{type(blocks)}' is not of type 'list'")
# Set timestamp
- _set_data("nodeinfo_url", domain, url)
- # DEBUG: print("DEBUG: EXIT!")
+ _set_data("total_blocks", domain, len(blocks))
+ logger.debug("EXIT!")
-def set_detection_mode (domain: str, url: list):
- # DEBUG: print(f"DEBUG: domain='{domain}',url='{url}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif not isinstance(url, str):
- raise ValueError("Parameter url[]='{type(url)}' is not 'list'")
- elif url == "":
- raise ValueError("Parameter 'url' is empty")
+def set_obfuscated_blocks(domain: str, obfuscated: int):
+ logger.debug("domain='%s',obfuscated=%d - CALLED!", domain, obfuscated)
+ domain_helper.raise_on(domain)
+
+ if not isinstance(obfuscated, int):
+ raise ValueError(f"Parameter obfuscated[]='{type(obfuscated)}' is not of type 'int'")
+ elif obfuscated < 0:
+ raise ValueError(f"Parameter obfuscated={obfuscated} is not valid")
# Set timestamp
- _set_data("detection_mode", domain, url)
- # DEBUG: print("DEBUG: EXIT!")
+ _set_data("obfuscated_blocks", domain, obfuscated)
+ logger.debug("EXIT!")
-def set_detection_mode (domain: str, url: list):
- # DEBUG: print(f"DEBUG: domain='{domain}',url='{url}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif not isinstance(url, str):
- raise ValueError("Parameter url[]='{type(url)}' is not 'list'")
+def set_nodeinfo_url(domain: str, url: str):
+ logger.debug("domain='%s',url='%s' - CALLED!", domain, url)
+ domain_helper.raise_on(domain)
+
+ if not isinstance(url, str) and url is not None:
+ raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
elif url == "":
raise ValueError("Parameter 'url' is empty")
# Set timestamp
- _set_data("detection_mode", domain, url)
- # DEBUG: print("DEBUG: EXIT!")
+ _set_data("nodeinfo_url", domain, url)
+ logger.debug("EXIT!")
-def set_detection_mode (domain: str, mode: list):
- # DEBUG: print(f"DEBUG: domain='{domain}',mode='{mode}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif not isinstance(mode, str):
- raise ValueError("Parameter mode[]='{type(mode)}' is not 'list'")
+def set_detection_mode(domain: str, mode: str):
+ logger.debug("domain='%s',mode='%s' - CALLED!", domain, mode)
+ domain_helper.raise_on(domain)
+
+ if not isinstance(mode, str) and mode is not None:
+ raise ValueError(f"Parameter mode[]='{type(mode)}' is not of type 'str'")
elif mode == "":
raise ValueError("Parameter 'mode' is empty")
# Set timestamp
_set_data("detection_mode", domain, mode)
- # DEBUG: print("DEBUG: EXIT!")
+ logger.debug("EXIT!")
+
+def set_has_obfuscation(domain: str, status: bool):
+ logger.debug("domain='%s',status='%s' - CALLED!", domain, status)
+ domain_helper.raise_on(domain)
+
+ if not isinstance(status, bool):
+ raise ValueError(f"Parameter status[]='{type(status)}' is not of type 'bool'")
+
+ # Set timestamp
+ _set_data("has_obfuscation", domain, status)
+ logger.debug("EXIT!")
+
+def set_software(domain: str, software: str):
+ logger.debug("domain='%s',software='%s' - CALLED!", domain, software)
+ domain_helper.raise_on(domain)
+
+ if not isinstance(software, str) and software is not None:
+ raise ValueError(f"Parameter software[]='{type(software)}' is not of type 'str'")
+ elif software == "":
+ raise ValueError("Parameter 'software' is empty")
+
+ # Set timestamp
+ _set_data("software", domain, software)
+ logger.debug("EXIT!")
+
+def valid(value: str, column: str) -> bool:
+ logger.debug("value='%s' - CALLED!", value)
+ if not isinstance(value, str):
+ raise ValueError(f"Parameter value[]='{type(value)}' is not of type 'str'")
+ elif value == "":
+ raise ValueError("Parameter 'value' is empty")
+ elif not isinstance(column, str):
+ raise ValueError(f"Parameter column[]='{type(column)}' is not of type 'str'")
+ elif column == "":
+ raise ValueError("Parameter 'column' is empty")
+
+ # Query database
+ database.cursor.execute(
+ f"SELECT {column} FROM instances WHERE {column} = ? LIMIT 1", [value]
+ )
+
+ is_valid = database.cursor.fetchone() is not None
+
+ logger.debug("is_valid='%s' - EXIT!", is_valid)
+ return is_valid
+
+def translate_idnas(rows: list, column: str):
+ logger.debug("rows[]='%s' - CALLED!", type(rows))
+ if not isinstance(rows, list):
+ raise ValueError("rows[]='{type(rows)}' is not of type 'list'")
+ elif len(rows) == 0:
+ raise ValueError("Parameter 'rows' is an empty list")
+ elif not isinstance(column, str):
+ raise ValueError(f"column='{type(column)}' is not of type 'str'")
+ elif column == "":
+ raise ValueError("Parameter 'column' is empty")
+ elif column not in ["domain", "origin"]:
+ raise ValueError(f"column='{column}' is not supported")
+
+ logger.info("Checking/converting %d domain names ...", len(rows))
+ for row in rows:
+ logger.debug("row[]='%s'", type(row))
+
+ translated = row[column].encode("idna").decode("utf-8")
+ logger.debug("translated='%s',row[%s]='%s'", translated, column, row[column])
+
+ if translated != row[column]:
+ logger.info("Translated row[%s]='%s' to '%s'", column, row[column], translated)
+ if is_registered(translated, True):
+ logger.warning("Deleting row[%s]='%s' as translated='%s' already exist", column, row[column], translated)
+ database.cursor.execute(f"DELETE FROM instances WHERE {column} = ? LIMIT 1", [row[column]])
+ else:
+ database.cursor.execute(f"UPDATE instances SET {column} = ? WHERE {column} = ? LIMIT 1", [translated, row[column]])
+
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
+
+ logger.debug("EXIT!")