X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;ds=sidebyside;f=fba%2Futils.py;h=30f3a1c0b43d019173ec068cc58f158f5e085bb9;hb=7c0ba6fa9ac609b8341127be4fa29691bdd2b232;hp=577e9ab4635c317cd06064985bc308570b73db9f;hpb=dca35966789ae46f89735cfbbc89bd9e2eea8844;p=fba.git diff --git a/fba/utils.py b/fba/utils.py index 577e9ab..30f3a1c 100644 --- a/fba/utils.py +++ b/fba/utils.py @@ -1,4 +1,4 @@ -# Copyright (C) 2023 Free Software Foundation + # Copyright (C) 2023 Free Software Foundation # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published @@ -23,13 +23,13 @@ import requests import validators from fba.helpers import blacklist -from fba.helpers import cookies from fba.helpers import domain as domain_helper from fba.helpers import tidyup from fba.http import federation from fba.http import network +from fba.models import blocks from fba.models import instances logging.basicConfig(level=logging.INFO) @@ -39,7 +39,7 @@ logger = logging.getLogger(__name__) def is_primitive(var: any) -> bool: logger.debug("var[]='%s' - CALLED!", type(var)) - return type(var) in {int, str, float, bool} or var is None + return type(var) in {int, str, float, bool, None} or var is None def get_hash(domain: str) -> str: logger.debug("domain='%s' - CALLED!", domain) @@ -50,13 +50,13 @@ def get_hash(domain: str) -> str: def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response: logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout) if not isinstance(url, str): - raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'") + raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'") elif url == "": raise ValueError("Parameter 'url' is empty") elif not isinstance(headers, dict): - raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'") + raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'") elif not isinstance(timeout, tuple): - raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'") + raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'") logger.debug("Parsing url='%s' ...", url) components = urlparse(url) @@ -77,46 +77,16 @@ def process_domain(domain: str, blocker: str, command: str) -> bool: domain_helper.raise_on(blocker) if not isinstance(command, str): - raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'") + raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'") elif command == "": raise ValueError("Parameter 'command' is empty") - logger.debug("domain='%s' - BEFORE!") - if domain.find("*") > 0: - logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker) - instances.set_has_obfuscation(blocker, True) - - # Try to de-obscure it - row = instances.deobfuscate("*", domain) - - logger.debug("row[%s]='%s'", type(row), row) - if row is None: - logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain) - return False - - logger.debug("domain='%s' de-obscured to '%s'", domain, row[0]) - domain = row[0] - elif domain.find("?") > 0: - logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker) - instances.set_has_obfuscation(blocker, True) - - # Try to de-obscure it - row = instances.deobfuscate("?", domain) - - logger.debug("row[%s]='%s'", type(row), row) - if row is None: - logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain) - return False - - logger.debug("domain='%s' de-obscured to '%s'", domain, row[0]) - domain = row[0] - else: - logger.debug("blocker='%s' has NO obfuscation on their block list", blocker) - instances.set_has_obfuscation(blocker, False) + logger.debug("domain='%s' - BEFORE!", domain) + domain = deobfuscate_domain(domain, blocker) logger.debug("domain='%s' - DEOBFUSCATED!", domain) if instances.has_pending(blocker): - logger.debug("Invoking instances.update_data(%s) ...", blocker) + logger.debug("Flushing updates for blocker='%s' ...", blocker) instances.update_data(blocker) if not is_domain_wanted(domain): @@ -131,22 +101,24 @@ def process_domain(domain: str, blocker: str, command: str) -> bool: logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command) federation.fetch_instances(domain, blocker, None, command) processed = True - - logger.debug("Invoking cookies.clear(%s) ...", domain) - cookies.clear(domain) except network.exceptions as exception: - logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain) + logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain) instances.set_last_error(domain, exception) + logger.debug("Checking if domain='%s' has pending updates ...", domain) + if instances.has_pending(domain): + logger.debug("Flushing updates for domain='%s' ...", domain) + instances.update_data(domain) + logger.debug("processed='%s' - EXIT!", processed) return processed def find_domains(tags: bs4.element.ResultSet, search: str) -> list: logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search) if not isinstance(tags, bs4.element.ResultSet): - raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'") + raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'") elif not isinstance(search, str): - raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'") + raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'") elif search == "": raise ValueError("Parameter 'search' is empty") @@ -162,7 +134,7 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list: domain = tidyup.domain(tag.find("em").contents[0]) if not is_domain_wanted(domain): - logger.debug("domain='%s' is not wanted - SKIPPED!") + logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue logger.debug("Appending domain='%s'", domain) @@ -176,7 +148,7 @@ def is_domain_wanted(domain: str) -> bool: wanted = True if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif domain.lower() != domain: @@ -187,12 +159,108 @@ def is_domain_wanted(domain: str) -> bool: elif domain.endswith(".arpa"): logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain) wanted = False + elif domain.endswith(".onion"): + logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain) + wanted = False elif domain.endswith(".tld"): logger.debug("domain='%s' is a fake domain - settings False ...", domain) wanted = False elif blacklist.is_blacklisted(domain): logger.debug("domain='%s' is blacklisted - settings False ...", domain) wanted = False + elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0): + logger.debug("domain='%s' is a single user", domain) + wanted = False + elif domain.find("/tag/") > 0: + logger.debug("domain='%s' is a tag", domain) + wanted = False logger.debug("wanted='%s' - EXIT!", wanted) return wanted + +def deobfuscate_domain(domain: str, blocker: str, domain_hash: str = None) -> str: + logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash) + domain_helper.raise_on(blocker) + + if not isinstance(domain, str): + raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'") + elif domain == "": + raise ValueError("Parameter domain is empty") + elif not isinstance(domain_hash, str) and domain_hash is not None: + raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'") + + if domain.find("*") >= 0: + logger.debug("blocker='%s' uses obfuscated domains", blocker) + + # Obscured domain name with no hash + row = instances.deobfuscate("*", domain, domain_hash) + + logger.debug("row[]='%s'", type(row)) + if row is not None: + logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"]) + domain = row["domain"] + else: + logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain) + instances.set_has_obfuscation(blocker, True) + elif domain.find("?") >= 0: + logger.debug("blocker='%s' uses obfuscated domains", blocker) + + # Obscured domain name with no hash + row = instances.deobfuscate("?", domain, domain_hash) + + logger.debug("row[]='%s'", type(row)) + if row is not None: + logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"]) + domain = row["domain"] + else: + logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain) + instances.set_has_obfuscation(blocker, True) + else: + logger.debug("domain='%s' is not obfuscated", domain) + + logger.debug("domain='%s' - EXIT!", domain) + return domain + +def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool: + logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level) + domain_helper.raise_on(blocker) + domain_helper.raise_on(blocked) + + added = False + if not isinstance(reason, str) and reason is not None: + raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'") + elif not isinstance(block_level, str): + raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'") + elif block_level == "": + raise ValueError("Parameter block_level is empty") + + if not blocks.is_instance_blocked(blocker, blocked, block_level): + logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level) + blocks.add_instance(blocker, blocked, reason, block_level) + added = True + else: + logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked) + blocks.update_last_seen(blocker, blocked, block_level) + + logger.debug("added='%s' - EXIT!", added) + return added + +def alias_block_level(block_level: str) -> str: + logger.debug("block_level='%s' - CALLED!", block_level) + if not isinstance(block_level, str): + raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level)) + elif block_level == "": + raise ValueError("Parameter 'block_level' is empty") + + if block_level == "silence": + logger.debug("Block level 'silence' has been changed to 'silenced'") + block_level = "silenced" + elif block_level == "suspend": + logger.debug("Block level 'suspend' has been changed to 'suspended'") + block_level = "suspended" + elif block_level == "nsfw": + logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'") + block_level = "media_nsfw" + + logger.debug("block_level='%s' - EXIT!", block_level) + return block_level