import reqto
import validators
+from urllib.parse import urlparse
+
from fba import csrf
from fba import database
from fba import utils
logger.debug("Determined software='%s'", software)
if software != row["software"] and software is not None:
- logger.warning("Software type has changed from '%s' to '%s'!", row["software"], software)
+ logger.warning("Software type for row[domain]='%s' has changed from '%s' to '%s'!", row["domain"], row["software"], software)
instances.set_software(row["domain"], software)
instances.set_success(row["domain"])
import logging
-from urllib.parse import urlparse
-
import bs4
import reqto
import requests
headers=network.web_headers,
timeout=(config.get("connection_timeout"), config.get("read_timeout"))
)
- components = urlparse(response.url)
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
- if response.ok and response.status_code < 300 and response.text.strip() != "" and response.text.find("<html") > 0 and domain == components.netloc:
+ if response.ok and response.status_code < 300 and response.text.strip() != "" and response.text.find("<html") > 0 and domain_helper.is_in_url(domain, response.url):
# Save cookies
logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
cookies.store(domain, response.cookies.get_dict())
if tag is not None:
logger.debug("Adding CSRF token='%s' for domain='%s'", tag["content"], domain)
reqheaders["X-CSRF-Token"] = tag["content"]
- elif domain != components.netloc:
- logger.warning("domain='%s' doesn't match components.netloc='%s', maybe redirect to other domain?", domain, components.netloc)
- message = f"Redirect from domain='{domain}' to components.netloc='{components.netloc}'"
+ elif not domain_helper.is_in_url(domain, response.url):
+ logger.warning("domain='%s' doesn't match with response.url='%s', maybe redirect to other domain?", domain, response.url)
+ message = f"Redirect from domain='{domain}' to response.url='{response.url}'"
instances.set_last_error(domain, message)
raise requests.exceptions.TooManyRedirects(message)
import logging
+from urllib.parse import urlparse
+
import validators
logging.basicConfig(level=logging.INFO)
raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
logger.debug("EXIT!")
+
+def is_in_url(domain: str, url: str) -> bool:
+ logger.debug("domain='%s',url='%s' - CALLED!", domain, url)
+ raise_on(domain)
+
+ if not isinstance(url, str):
+ raise ValueError(f"Parameter url[]='%s' is not 'str'", type(url))
+ elif url == "":
+ raise ValueError("Parameter 'url' is empty")
+
+ components = urlparse(url)
+ punycode = domain.encode("idna").decode("utf-8")
+
+ logger.debug("components[]='%s',punycode='%s'", type(components), punycode)
+ is_found = (punycode == components.netloc or punycode == components.hostname)
+
+ logger.debug("is_found='%s' - EXIT!", is_found)
+ return is_found
(config.get("connection_timeout"), config.get("read_timeout")),
allow_redirects=True
)
- components = urlparse(response.url)
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
- if response.ok and response.status_code < 300 and response.text.find("<html") > 0 and components.netloc == domain:
+ if response.ok and response.status_code < 300 and response.text.find("<html") > 0 and domain_helper.is_in_url(domain, response.url):
logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
doc = bs4.BeautifulSoup(response.text, "html.parser")
if software is not None and software != "":
logger.debug("domain='%s' has og:platform='%s' - Setting detection_mode=PLATFORM ...", domain, software)
instances.set_detection_mode(domain, "PLATFORM")
- elif domain != components.netloc:
- logger.warning("domain='%s' doesn't match components.netloc='%s', maybe redirect to other domain?", domain, components.netloc)
- message = f"Redirect from domain='{domain}' to components.netloc='{components.netloc}'"
+ elif not domain_helper.is_in_url(domain, response.url):
+ logger.warning("domain='%s' doesn't match response.url='%s', maybe redirect to other domain?", domain, response.url)
+ message = f"Redirect from domain='{domain}' to response.url='{response.url}'"
instances.set_last_error(domain, message)
instances.set_software(domain, None)
instances.set_detection_mode(domain, None)