X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Futils.py;h=8244a78a1cce162e056e235e8244ad3fcb3f5235;hb=3ad5eb0b8994ddecadc7b7c05b2241c5d4cb0ae7;hp=46fa37efee7ca31f8f883390796b17b1ed24a19f;hpb=cb4993790ce9797ac97822ac7c230acf82dd966e;p=fba.git diff --git a/fba/utils.py b/fba/utils.py index 46fa37e..8244a78 100644 --- a/fba/utils.py +++ b/fba/utils.py @@ -1,4 +1,4 @@ -# Copyright (C) 2023 Free Software Foundation + # Copyright (C) 2023 Free Software Foundation # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published @@ -20,13 +20,11 @@ from urllib.parse import urlparse import bs4 import requests -import validators -from fba.helpers import blacklist -from fba.helpers import cookies +from fba.helpers import config +from fba.helpers import domain as domain_helper from fba.helpers import tidyup -from fba.http import federation from fba.http import network from fba.models import instances @@ -37,145 +35,82 @@ logger = logging.getLogger(__name__) ##### Other functions ##### def is_primitive(var: any) -> bool: - logger.debug(f"var[]='{type(var)}' - CALLED!") - return type(var) in {int, str, float, bool} or var is None + logger.debug("var[]='%s' - CALLED!", type(var)) + return type(var) in {int, str, float, bool, None} or var is None def get_hash(domain: str) -> str: - logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + logger.debug("domain='%s' - CALLED!", domain) + domain_helper.raise_on(domain) return hashlib.sha256(domain.encode("utf-8")).hexdigest() def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response: - logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!") + logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout) + if not isinstance(url, str): - raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'") + raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'") elif url == "": raise ValueError("Parameter 'url' is empty") elif not isinstance(headers, dict): - raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'") + raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'") elif not isinstance(timeout, tuple): - raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'") + raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'") - logger.debug(f"Parsing url='{url}'") + logger.debug("Parsing url='%s' ...", url) components = urlparse(url) # Invoke other function, avoid trailing ? - logger.debug(f"components[{type(components)}]={components}") + logger.debug("components[%s]='%s'", type(components), components) if components.query != "": - response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout) + response = network.fetch_response( + components.netloc.split(":")[0], + f"{components.path}?{components.query}", + headers, + timeout + ) else: - response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout) - - logger.debug(f"response[]='{type(response)}' - EXXIT!") + response = network.fetch_response( + components.netloc.split(":")[0], + components.path if isinstance(components.path, str) and components.path != '' else '/', + headers, + timeout + ) + + logger.debug("response[]='%s' - EXIT!", type(response)) return response -def process_domain(domain: str, blocker: str, command: str) -> bool: - logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(blocker, str): - raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'") - elif blocker == "": - raise ValueError("Parameter 'blocker' is empty") - elif not validators.domain(blocker.split("/")[0]): - raise ValueError(f"blocker='{blocker}' is not a valid domain") - elif blocker.endswith(".arpa"): - raise ValueError(f"blocker='{blocker}' is a domain for reversed IP addresses, please don't crawl them!") - elif blocker.endswith(".tld"): - raise ValueError(f"blocker='{blocker}' is a fake domain, please don't crawl them!") - elif not isinstance(command, str): - raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'") - elif command == "": - raise ValueError("Parameter 'command' is empty") - - if domain.find("*") > 0: - # Try to de-obscure it - row = instances.deobscure("*", domain) - - logger.debug(f"row[{type(row)}]='{row}'") - if row is None: - logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!") - return False - - logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'") - domain = row[0] - elif domain.find("?") > 0: - # Try to de-obscure it - row = instances.deobscure("?", domain) - - logger.debug(f"row[{type(row)}]='{row}'") - if row is None: - logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!") - return False - - logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'") - domain = row[0] - - if not is_domain_wanted(domain) - logger.debug("domain='%s' is not wanted - SKIPPED!", domain) - return False - elif instances.is_recent(domain): - logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!") - return False - - processed = False - try: - logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command) - federation.fetch_instances(domain, blocker, None, command) - processed = True - - logger.debug("Invoking cookies.clear(%s) ...", domain) - cookies.clear(domain) - except network.exceptions as exception: - logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'") - instances.set_last_error(domain, exception) - - logger.debug(f"processed='{processed}' - EXIT!") - return processed - def find_domains(tags: bs4.element.ResultSet, search: str) -> list: logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search) + if not isinstance(tags, bs4.element.ResultSet): - raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'") + raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'") elif not isinstance(search, str): - raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'") + raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'") elif search == "": raise ValueError("Parameter 'search' is empty") domains = list() + logger.debug("Parsing %d tags ...", len(tags)) for tag in tags: logger.debug("tag[]='%s'", type(tag)) domain = tidyup.domain(tag.find(search).contents[0]) + logger.debug("domain='%s' - AFTER!", domain) - logger.debug("domain='%s'", domain) if domain == "": logger.debug("tag='%s' has no domain, trying ...", tag) domain = tidyup.domain(tag.find("em").contents[0]) + logger.debug("domain='%s' - AFTER!", domain) - if not is_domain_wanted(domain): - logger.debug("domain='%s' is not wanted - SKIPPED!") + if domain == "": + logger.warning("Empty domain after checking search='%s' and tags - SKIPPED!", search) + continue + + logger.debug("domain='%s' - BEFORE!", domain) + domain = domain.encode("idna").decode("utf-8") + logger.debug("domain='%s' - AFTER!", domain) + + if not domain_helper.is_wanted(domain): + logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue logger.debug("Appending domain='%s'", domain) @@ -184,28 +119,51 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list: logger.debug("domains()=%d - EXIT!", len(domains)) return domains -def is_domain_wanted (domain: str) -> bool: - logger.debug("domain='%s' - CALLED!", domain) - wanted = True +def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str: + logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash) + domain_helper.raise_on(blocker) if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'") elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - logger.debug("domain='%s' is not a valid domain name - settings False ...", domain) - wanted = False - elif domain.endswith(".arpa"): - logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain) - wanted = False - elif domain.endswith(".tld"): - logger.debug("domain='%s' is a fake domain - settings False ...", domain) - wanted = False - elif blacklist.is_blacklisted(domain): - logger.debug("domain='%s' is blacklisted - settings False ...", domain) - wanted = False - - logger.debug("wanted='%s' - EXIT!", wanted) - return wanted + raise ValueError("Parameter domain is empty") + elif not isinstance(domain_hash, str) and domain_hash is not None: + raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'") + + logger.debug("Setting has_obfuscation=False for blocker='%s' ...", blocker) + instances.set_has_obfuscation(blocker, False) + + if domain.find("*") >= 0: + logger.debug("blocker='%s' uses obfuscated domains", blocker) + instances.set_has_obfuscation(blocker, True) + + # Obscured domain name with no hash + row = instances.deobfuscate("*", domain, domain_hash) + + logger.debug("row[]='%s'", type(row)) + if row is not None: + logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"]) + domain = row["domain"] + else: + logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain) + elif domain.find("?") >= 0: + logger.debug("blocker='%s' uses obfuscated domains", blocker) + instances.set_has_obfuscation(blocker, True) + + # Obscured domain name with no hash + row = instances.deobfuscate("?", domain, domain_hash) + + logger.debug("row[]='%s'", type(row)) + if row is not None: + logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"]) + domain = row["domain"] + else: + logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain) + else: + logger.debug("domain='%s' is not obfuscated", domain) + + logger.debug("domain='%s' - EXIT!", domain) + return domain + +def base_url() -> str: + return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"