X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Futils.py;h=8244a78a1cce162e056e235e8244ad3fcb3f5235;hb=03f34913cb74f05e6470dd576bf8fee8625a6328;hp=756dc5bf18428b69950afef719fd319058937b30;hpb=f384904c74557de49ca7fddb0555821064a8bb13;p=fba.git diff --git a/fba/utils.py b/fba/utils.py index 756dc5b..8244a78 100644 --- a/fba/utils.py +++ b/fba/utils.py @@ -20,16 +20,13 @@ from urllib.parse import urlparse import bs4 import requests -import validators -from fba.helpers import blacklist +from fba.helpers import config from fba.helpers import domain as domain_helper from fba.helpers import tidyup -from fba.http import federation from fba.http import network -from fba.models import blocks from fba.models import instances logging.basicConfig(level=logging.INFO) @@ -49,6 +46,7 @@ def get_hash(domain: str) -> str: def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response: logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout) + if not isinstance(url, str): raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'") elif url == "": @@ -64,57 +62,26 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon # Invoke other function, avoid trailing ? logger.debug("components[%s]='%s'", type(components), components) if components.query != "": - response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout) + response = network.fetch_response( + components.netloc.split(":")[0], + f"{components.path}?{components.query}", + headers, + timeout + ) else: - response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout) + response = network.fetch_response( + components.netloc.split(":")[0], + components.path if isinstance(components.path, str) and components.path != '' else '/', + headers, + timeout + ) logger.debug("response[]='%s' - EXIT!", type(response)) return response -def process_domain(domain: str, blocker: str, command: str) -> bool: - logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command) - domain_helper.raise_on(domain) - domain_helper.raise_on(blocker) - - if not isinstance(command, str): - raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'") - elif command == "": - raise ValueError("Parameter 'command' is empty") - - logger.debug("domain='%s' - BEFORE!", domain) - domain = deobfuscate(domain, blocker) - - logger.debug("domain='%s' - DEOBFUSCATED!", domain) - if instances.has_pending(blocker): - logger.debug("Flushing updates for blocker='%s' ...", blocker) - instances.update_data(blocker) - - if not is_domain_wanted(domain): - logger.debug("domain='%s' is not wanted - SKIPPED!", domain) - return False - elif instances.is_recent(domain): - logger.debug("domain='%s' has been recently checked - SKIPPED!", domain) - return False - - processed = False - try: - logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command) - federation.fetch_instances(domain, blocker, None, command) - processed = True - except network.exceptions as exception: - logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain) - instances.set_last_error(domain, exception) - - logger.debug("Checking if domain='%s' has pending updates ...", domain) - if instances.has_pending(domain): - logger.debug("Flushing updates for domain='%s' ...", domain) - instances.update_data(domain) - - logger.debug("processed='%s' - EXIT!", processed) - return processed - def find_domains(tags: bs4.element.ResultSet, search: str) -> list: logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search) + if not isinstance(tags, bs4.element.ResultSet): raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'") elif not isinstance(search, str): @@ -142,7 +109,7 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list: domain = domain.encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue @@ -152,41 +119,6 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list: logger.debug("domains()=%d - EXIT!", len(domains)) return domains -def is_domain_wanted(domain: str) -> bool: - logger.debug("domain='%s' - CALLED!", domain) - - wanted = True - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - wanted = False - elif not validators.domain(domain.split("/")[0]): - logger.debug("domain='%s' is not a valid domain name - settings False ...", domain) - wanted = False - elif domain.endswith(".arpa"): - logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain) - wanted = False - elif domain.endswith(".onion"): - logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain) - wanted = False - elif domain.endswith(".tld"): - logger.debug("domain='%s' is a fake domain - settings False ...", domain) - wanted = False - elif blacklist.is_blacklisted(domain): - logger.debug("domain='%s' is blacklisted - settings False ...", domain) - wanted = False - elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0): - logger.debug("domain='%s' is a single user", domain) - wanted = False - elif domain.find("/tag/") > 0: - logger.debug("domain='%s' is a tag", domain) - wanted = False - - logger.debug("wanted='%s' - EXIT!", wanted) - return wanted - def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str: logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash) domain_helper.raise_on(blocker) @@ -198,8 +130,12 @@ def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str: elif not isinstance(domain_hash, str) and domain_hash is not None: raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'") + logger.debug("Setting has_obfuscation=False for blocker='%s' ...", blocker) + instances.set_has_obfuscation(blocker, False) + if domain.find("*") >= 0: logger.debug("blocker='%s' uses obfuscated domains", blocker) + instances.set_has_obfuscation(blocker, True) # Obscured domain name with no hash row = instances.deobfuscate("*", domain, domain_hash) @@ -210,9 +146,9 @@ def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str: domain = row["domain"] else: logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain) - instances.set_has_obfuscation(blocker, True) elif domain.find("?") >= 0: logger.debug("blocker='%s' uses obfuscated domains", blocker) + instances.set_has_obfuscation(blocker, True) # Obscured domain name with no hash row = instances.deobfuscate("?", domain, domain_hash) @@ -223,56 +159,11 @@ def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str: domain = row["domain"] else: logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain) - instances.set_has_obfuscation(blocker, True) else: logger.debug("domain='%s' is not obfuscated", domain) logger.debug("domain='%s' - EXIT!", domain) return domain -def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool: - logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level) - domain_helper.raise_on(blocker) - domain_helper.raise_on(blocked) - - added = False - if not isinstance(reason, str) and reason is not None: - raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'") - elif not isinstance(block_level, str): - raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'") - elif block_level == "": - raise ValueError("Parameter block_level is empty") - - if not blocks.is_instance_blocked(blocker, blocked, block_level): - logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level) - blocks.add_instance(blocker, blocked, reason, block_level) - added = True - else: - logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked) - blocks.update_last_seen(blocker, blocked, block_level) - - logger.debug("added='%s' - EXIT!", added) - return added - -def alias_block_level(block_level: str) -> str: - logger.debug("block_level='%s' - CALLED!", block_level) - if not isinstance(block_level, str): - raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level)) - elif block_level == "": - raise ValueError("Parameter 'block_level' is empty") - - if block_level == "silence": - logger.debug("Block level 'silence' has been changed to 'silenced'") - block_level = "silenced" - elif block_level == "suspend": - logger.debug("Block level 'suspend' has been changed to 'suspended'") - block_level = "suspended" - elif block_level == "nsfw": - logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'") - block_level = "media_nsfw" - elif block_level == "quarantined_instances": - logger.debug("Block level 'quarantined_instances' has been changed to 'quarantined'") - block_level = "quarantined" - - logger.debug("block_level='%s' - EXIT!", block_level) - return block_level +def base_url() -> str: + return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"