From: Roland Häder Date: Tue, 4 Jul 2023 12:57:02 +0000 (+0200) Subject: Continued: X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=35f7057194d0a6bfbb58dc2a34e0ae1562643b99;p=fba.git Continued: - added domain.is_in_url() to check if domain is matching netloc or hostname part of the URL. This function encodes the domain into punycode before comparing it --- diff --git a/fba/commands.py b/fba/commands.py index d1a09c0..89a3757 100644 --- a/fba/commands.py +++ b/fba/commands.py @@ -27,6 +27,8 @@ import markdown import reqto import validators +from urllib.parse import urlparse + from fba import csrf from fba import database from fba import utils @@ -1538,7 +1540,7 @@ def update_nodeinfo(args: argparse.Namespace) -> int: logger.debug("Determined software='%s'", software) if software != row["software"] and software is not None: - logger.warning("Software type has changed from '%s' to '%s'!", row["software"], software) + logger.warning("Software type for row[domain]='%s' has changed from '%s' to '%s'!", row["domain"], row["software"], software) instances.set_software(row["domain"], software) instances.set_success(row["domain"]) diff --git a/fba/csrf.py b/fba/csrf.py index 2ed3613..c523159 100644 --- a/fba/csrf.py +++ b/fba/csrf.py @@ -16,8 +16,6 @@ import logging -from urllib.parse import urlparse - import bs4 import reqto import requests @@ -50,10 +48,9 @@ def determine(domain: str, headers: dict) -> dict: headers=network.web_headers, timeout=(config.get("connection_timeout"), config.get("read_timeout")) ) - components = urlparse(response.url) logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text)) - if response.ok and response.status_code < 300 and response.text.strip() != "" and response.text.find(" 0 and domain == components.netloc: + if response.ok and response.status_code < 300 and response.text.strip() != "" and response.text.find(" 0 and domain_helper.is_in_url(domain, response.url): # Save cookies logger.debug("Parsing response.text()=%d Bytes ...", len(response.text)) cookies.store(domain, response.cookies.get_dict()) @@ -70,9 +67,9 @@ def determine(domain: str, headers: dict) -> dict: if tag is not None: logger.debug("Adding CSRF token='%s' for domain='%s'", tag["content"], domain) reqheaders["X-CSRF-Token"] = tag["content"] - elif domain != components.netloc: - logger.warning("domain='%s' doesn't match components.netloc='%s', maybe redirect to other domain?", domain, components.netloc) - message = f"Redirect from domain='{domain}' to components.netloc='{components.netloc}'" + elif not domain_helper.is_in_url(domain, response.url): + logger.warning("domain='%s' doesn't match with response.url='%s', maybe redirect to other domain?", domain, response.url) + message = f"Redirect from domain='{domain}' to response.url='{response.url}'" instances.set_last_error(domain, message) raise requests.exceptions.TooManyRedirects(message) diff --git a/fba/helpers/domain.py b/fba/helpers/domain.py index 76738ac..4e7dc38 100644 --- a/fba/helpers/domain.py +++ b/fba/helpers/domain.py @@ -16,6 +16,8 @@ import logging +from urllib.parse import urlparse + import validators logging.basicConfig(level=logging.INFO) @@ -39,3 +41,21 @@ def raise_on(domain: str): raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") logger.debug("EXIT!") + +def is_in_url(domain: str, url: str) -> bool: + logger.debug("domain='%s',url='%s' - CALLED!", domain, url) + raise_on(domain) + + if not isinstance(url, str): + raise ValueError(f"Parameter url[]='%s' is not 'str'", type(url)) + elif url == "": + raise ValueError("Parameter 'url' is empty") + + components = urlparse(url) + punycode = domain.encode("idna").decode("utf-8") + + logger.debug("components[]='%s',punycode='%s'", type(components), punycode) + is_found = (punycode == components.netloc or punycode == components.hostname) + + logger.debug("is_found='%s' - EXIT!", is_found) + return is_found diff --git a/fba/http/federation.py b/fba/http/federation.py index 763ea60..f8b0450 100644 --- a/fba/http/federation.py +++ b/fba/http/federation.py @@ -410,10 +410,9 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: (config.get("connection_timeout"), config.get("read_timeout")), allow_redirects=True ) - components = urlparse(response.url) logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text)) - if response.ok and response.status_code < 300 and response.text.find(" 0 and components.netloc == domain: + if response.ok and response.status_code < 300 and response.text.find(" 0 and domain_helper.is_in_url(domain, response.url): logger.debug("Parsing response.text()=%d Bytes ...", len(response.text)) doc = bs4.BeautifulSoup(response.text, "html.parser") @@ -447,9 +446,9 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: if software is not None and software != "": logger.debug("domain='%s' has og:platform='%s' - Setting detection_mode=PLATFORM ...", domain, software) instances.set_detection_mode(domain, "PLATFORM") - elif domain != components.netloc: - logger.warning("domain='%s' doesn't match components.netloc='%s', maybe redirect to other domain?", domain, components.netloc) - message = f"Redirect from domain='{domain}' to components.netloc='{components.netloc}'" + elif not domain_helper.is_in_url(domain, response.url): + logger.warning("domain='%s' doesn't match response.url='%s', maybe redirect to other domain?", domain, response.url) + message = f"Redirect from domain='{domain}' to response.url='{response.url}'" instances.set_last_error(domain, message) instances.set_software(domain, None) instances.set_detection_mode(domain, None)