]
def is_blacklisted(domain: str) -> bool:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
blacklisted = False
+ logger.debug("Checking %d blacklist entries ...", len(blacklist))
for peer in blacklist:
- logger.debug(f"Checking peer='{peer}' ...")
+ logger.debug("Checking peer='%s' ...", peer)
if peer in domain:
- logger.debug(f"domain='{domain}' is blacklisted.")
+ logger.debug("domain='%s' is blacklisted.", domain)
blacklisted = True
- logger.debug(f"blacklisted='{blacklisted}' - EXIT!")
+ logger.debug("blacklisted='%s' - EXIT!", blacklisted)
return blacklisted
return key in _cache
def set_all(key: str, rows: list, value: any):
- logger.debug(f"key='{key}',rows()={len(rows)},value[]='{type(value)}' - CALLED!")
+ logger.debug("key='%s',rows()=%d,value[]='%s' - CALLED!", key, len(rows), type(value))
if not isinstance(key, str):
- raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
+ raise ValueError(f"Parameter key[]='{type(key)}' is not 'str'")
elif not key_exists(key):
- logger.debug(f"Cache for key='{key}' not initialized.")
- _cache[key] = {}
+ logger.debug("Cache for key='%s' not initialized.", key)
+ _cache[key] = dict()
for sub in rows:
- logger.debug(f"Setting key='{key}',sub[{type(sub)}]='{sub}'")
+ logger.debug("Setting key='%s',sub[%s]='%s'", key, type(sub), sub)
if isinstance(sub, tuple):
- logger.debug(f"Setting key='{key}',sub[{type(sub)}]='{sub}',value[]='{type(value)}'")
+ logger.debug("Setting key='%s',sub[%s]='%s',value[]='%s'", key, type(sub), sub, type(value))
_cache[key][sub[0]] = value
else:
logger.warning("Unsupported type sub[]='%s'", type(sub))
logger.debug("EXIT!")
def set_sub_key(key: str, sub: str, value: any):
- logger.debug(f"key='{key}',sub='{sub}',value[]='{type(value)}' - CALLED!")
+ logger.debug("key='%s',sub='%s',value[]='%s' - CALLED!", key, sub, type(value))
if not isinstance(key, str):
- raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
+ raise ValueError(f"Parameter key[]='{type(key)}' is not 'str'")
elif not isinstance(sub, str):
- raise ValueError("Parameter sub[]='{type(sub)}' is not 'str'")
+ raise ValueError(f"Parameter sub[]='{type(sub)}' is not 'str'")
elif not key_exists(key):
raise Exception(f"Cache for key='{key}' is not initialized, but function invoked")
- logger.debug(f"Setting key='{key}',sub='{sub}',value[]='{type(value)}' ...")
+ logger.debug("Setting key='%s',sub='%s',value[]='%s' ...", key, sub, type(value))
_cache[key][sub] = value
logger.debug("EXIT!")
def sub_key_exists(key: str, sub: str) -> bool:
- logger.debug(f"key='{key}',sub='{sub}' - CALLED!")
+ logger.debug("key='%s',sub='%s' - CALLED!", key, sub)
if not isinstance(key, str):
- raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
+ raise ValueError(f"Parameter key[]='{type(key)}' is not 'str'")
elif not isinstance(sub, str):
- raise ValueError("Parameter sub[]='{type(sub)}' is not 'str'")
+ raise ValueError(f"Parameter sub[]='{type(sub)}' is not 'str'")
elif not key_exists(key):
raise Exception(f"Cache for key='{key}' is not initialized, but function invoked")
exists = sub in _cache[key]
- logger.debug(f"exists='{exists}' - EXIT!")
+ logger.debug("exists='%s' - EXIT!", exists)
return exists
_config = json.loads(f.read())
def get(key: str) -> any:
- logger.debug(f"key[{type(key)}]={key} - CALLED!")
+ logger.debug("key[%s]='%s' - CALLED!", type(key), key)
if not isinstance(key, str):
raise ValueError(f"Parameter key[]='{type(key)}' is not 'str'")
elif key == "":
elif not key in _config:
raise KeyError(f"key='{key}' does not exist in _config array")
- logger.debug(f"_config[{key}]={_config[key]} - EXIT!")
+ logger.debug("_config[%s][%s]='%s - EXIT!", key, type(_config[key]), _config[key])
return _config[key]
logger.debug("EXIT!")
def get_all(domain: str) -> dict:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
+
if not has(domain):
raise Exception(f"domain='{domain}' has no cookies stored, maybe invoke store() first?")
- logger.debug(f"_cookies[{domain}]()={len(_cookies[domain])} - EXIT!")
+ logger.debug("_cookies[%s]()=%d - EXIT!", domain, len(_cookies[domain]))
return _cookies[domain]
def has (domain: str) -> bool:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
has_cookies = domain in _cookies
- logger.debug(f"has_cookies='{has_cookies}' - EXIT!")
+ logger.debug("has_cookies='%s' - EXIT!", has_cookies)
return has_cookies
def clear (domain: str):
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
if has(domain):
logger = logging.getLogger(__name__)
def has_key(lists: list, key: str, value: any) -> bool:
- logger.debug(f"lists()={len(lists)},key='{key}',value[]='{type(value)}' - CALLED!")
+ logger.debug("lists()=%d,key='%s',value[]='%s' - CALLED!", len(lists), key, type(value))
if not isinstance(lists, list):
raise ValueError(f"Parameter lists[]='{type(lists)}' is not 'list'")
elif not isinstance(key, str):
raise ValueError("Parameter 'key' is empty")
has = False
- logger.debug(f"Checking lists()={len(lists)} ...")
+ logger.debug("Checking lists()=%d ...", len(lists))
for row in lists:
- logger.debug(f"row['{type(row)}']={row}")
+ logger.debug("row[%s]='%s", type(row), row)
if not isinstance(row, dict):
raise ValueError(f"row[]='{type(row)}' is not 'dict'")
elif not key in row:
raise KeyError(f"Cannot find key='{key}'")
elif row[key] == value:
+ logger.debug("row[%s][]='%s' matches value[]='%s'", key, type(row[key]), type(value))
has = True
break
- logger.debug(f"has={has} - EXIT!")
+ logger.debug("has='%s' - EXIT!", has)
return has
return data
def fetch_wellknown_nodeinfo(domain: str) -> dict:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
# No CSRF by default, you don't have to add network.api_headers by yourself here
}
try:
- logger.debug(f"Sending POST to domain='{domain}',path='{path}',data='{data}',headers({len(headers)})={headers}")
+ logger.debug("Sending POST to domain='%s',path='%s',data='%s',headers(%d)='%s'", domain, path, data, len(headers), headers)
response = reqto.post(
f"https://{domain}{path}",
data=data,
headers={**api_headers, **headers},
timeout=(config.get("connection_timeout"), config.get("read_timeout")),
- cookies=cookies.get_all(domain) if cookies.has(domain) else {}
+ cookies=cookies.get_all(domain) if cookies.has(domain) else dict()
)
json_reply["json"] = json_from_response(response)
}
try:
- logger.debug(f"Sending GET to domain='{domain}',path='{path}',timeout({len(timeout)})={timeout}")
+ logger.debug("Sending GET to domain='%s',path='%s',timeout(%d)='%s'", domain, path, len(timeout), timeout)
response = reqto.get(
f"https://{domain}{path}",
headers={**api_headers, **headers},
)
except exceptions as exception:
- logger.debug(f"Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
+ logger.debug("Fetching path='%s' from domain='%s' failed. exception[%s]='%s'", path, domain, type(exception), str(exception))
json_reply["status_code"] = 999
json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
json_reply["exception"] = exception
logger.debug("EXIT!")
def has_pending(domain: str) -> bool:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
has = False
return has
def update_data(domain: str):
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
if not has_pending(domain):
raise Exception(f"domain='{domain}' has no pending instance data, but function invoked")
logger.debug("EXIT!")
def set_last_nodeinfo(domain: str):
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logger.debug("Updating last_nodeinfo for domain='%s'", domain)
logger.debug("EXIT!")
def is_registered(domain: str) -> bool:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
if not cache.key_exists("is_registered"):
logger.debug("Cache for 'is_registered' not initialized, fetching all rows ...")
database.cursor.execute("SELECT domain FROM instances")
return registered
def is_recent(domain: str) -> bool:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
if not is_registered(domain):
logger.debug(f"domain='{domain}' is not registered, returning False - EXIT!")
return row
def set_last_blocked(domain: str):
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
# Set timestamp
logger.debug("EXIT!")
def set_last_instance_fetch(domain: str):
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
# Set timestamp
#logger.setLevel(logging.DEBUG)
def fetch_blocks(domain: str) -> list:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
blocklist = list()
logger = logging.getLogger(__name__)
def fetch_peers(domain: str) -> list:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
peers = list()
}
def fetch_blocks_from_about(domain: str) -> dict:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logger.debug("Fetching mastodon blocks from domain='%s'", domain)
logger = logging.getLogger(__name__)
def fetch_peers(domain: str) -> list:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logger.debug("domain='%s' is misskey, sending API POST request ...", domain)
return peers
def fetch_blocks(domain: str) -> list:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logger.debug("Fetching misskey blocks from domain='%s'", domain)
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
- logger.debug("blocked='%s' - DEobfuscatED!", blocked)
+ logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
- logger.debug("blocked='%s' - DEobfuscatED!", blocked)
+ logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
blocked = row[0]
- logger.debug("blocked='%s' - DEobfuscatED!", blocked)
+ logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
if not utils.is_domain_wanted(blocked):
logger.warning("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
return blockdict
def fetch_blocks_from_about(domain: str) -> dict:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
logger.debug("Fetching mastodon blocks from domain='%s'", domain)
##### Other functions #####
def is_primitive(var: any) -> bool:
- logger.debug(f"var[]='{type(var)}' - CALLED!")
+ logger.debug("var[]='%s' - CALLED!", type(var))
return type(var) in {int, str, float, bool} or var is None
def get_hash(domain: str) -> str:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
return hashlib.sha256(domain.encode("utf-8")).hexdigest()
def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
- logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
+ logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
if not isinstance(url, str):
raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
elif url == "":
elif not isinstance(timeout, tuple):
raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
- logger.debug(f"Parsing url='{url}'")
+ logger.debug("Parsing url='%s' ...", url)
components = urlparse(url)
# Invoke other function, avoid trailing ?
- logger.debug(f"components[{type(components)}]={components}")
+ logger.debug("components[%s]='%s'", type(components), components)
if components.query != "":
response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
else:
response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
- logger.debug(f"response[]='{type(response)}' - EXIT!")
+ logger.debug("response[]='%s' - EXIT!", type(response))
return response
def process_domain(domain: str, blocker: str, command: str) -> bool:
- logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
+ logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
domain_helper.raise_on(domain)
domain_helper.raise_on(blocker)
elif command == "":
raise ValueError("Parameter 'command' is empty")
+ logger.debug("domain='%s' - BEFORE!")
if domain.find("*") > 0:
logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
instances.set_has_obfuscation(blocker, True)
# Try to de-obscure it
row = instances.deobfuscate("*", domain)
- logger.debug(f"row[{type(row)}]='{row}'")
+ logger.debug("row[%s]='%s'", type(row), row)
if row is None:
logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
return False
- logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
+ logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
domain = row[0]
elif domain.find("?") > 0:
logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
# Try to de-obscure it
row = instances.deobfuscate("?", domain)
- logger.debug(f"row[{type(row)}]='{row}'")
+ logger.debug("row[%s]='%s'", type(row), row)
if row is None:
logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
return False
- logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
+ logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
domain = row[0]
else:
logger.debug("blocker='%s' has NO obfuscation on their block list", blocker)
instances.set_has_obfuscation(blocker, False)
+ logger.debug("domain='%s' - DEOBFUSCATED!", domain)
if instances.has_pending(blocker):
logger.debug("Invoking instances.update_data(%s) ...", blocker)
instances.update_data(blocker)
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
return False
elif instances.is_recent(domain):
- logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
+ logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
return False
processed = False
logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
instances.set_last_error(domain, exception)
- logger.debug(f"processed='{processed}' - EXIT!")
+ logger.debug("processed='%s' - EXIT!", processed)
return processed
def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
raise ValueError("Parameter 'search' is empty")
domains = list()
+ logger.debug("Parsing %d tags ...", len(tags))
for tag in tags:
logger.debug("tag[]='%s'", type(tag))
domain = tidyup.domain(tag.find(search).contents[0])