X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Fcsrf.py;h=f529af35017c469cb860e20ed0308cb2a01edf78;hb=ce8867063ef798d15ce07dd573e65a647090a3a7;hp=89abd0562e26076b55c62470a503080287a252b4;hpb=cb4993790ce9797ac97822ac7c230acf82dd966e;p=fba.git diff --git a/fba/csrf.py b/fba/csrf.py index 89abd05..f529af3 100644 --- a/fba/csrf.py +++ b/fba/csrf.py @@ -18,38 +18,31 @@ import logging import bs4 import reqto -import validators +import requests from fba.helpers import config from fba.helpers import cookies +from fba.helpers import domain as domain_helper from fba.http import network +from fba.models import instances + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def determine(domain: str, headers: dict) -> dict: - logger.debug(f"domain='{domain}',headers()={len(headers)} - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(headers, dict): - raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'") + logger.debug("domain='%s',headers()=%d - CALLED!", domain, len(headers)) + domain_helper.raise_on(domain) + + if not isinstance(headers, dict): + raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'") # Default headers with no CSRF reqheaders = headers # Fetch / to check for meta tag indicating csrf - logger.debug(f"Fetching / from domain='{domain}' for CSRF check ...") + logger.debug("Fetching / from domain='%s' for CSRF check ...", domain) response = reqto.get( f"https://{domain}/", headers=network.web_headers, @@ -57,9 +50,9 @@ def determine(domain: str, headers: dict) -> dict: ) logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text)) - if response.ok and response.status_code < 300 and response.text != "" and response.text.find(" 0: + if response.ok and response.status_code < 300 and response.text.strip() != "" and response.text.find(" 0 and domain_helper.is_in_url(domain, response.url): # Save cookies - logger.debug(f"Parsing response.text()={len(response.text)} Bytes ...") + logger.debug("Parsing response.text()=%d Bytes ...", len(response.text)) cookies.store(domain, response.cookies.get_dict()) # Parse text @@ -67,13 +60,18 @@ def determine(domain: str, headers: dict) -> dict: response.text, "html.parser" ) - logger.debug(f"meta[]='{type(meta)}'") + logger.debug("meta[]='%s'", type(meta)) tag = meta.find("meta", attrs={"name": "csrf-token"}) - logger.debug(f"tag={tag}") + logger.debug("tag[%s]='%s'", type(tag), tag) if tag is not None: - logger.debug(f"Adding CSRF token='{tag['content']}' for domain='{domain}'") + logger.debug("Adding CSRF token='%s' for domain='%s'", tag["content"], domain) reqheaders["X-CSRF-Token"] = tag["content"] + elif not domain_helper.is_in_url(domain, response.url): + logger.warning("domain='%s' doesn't match with response.url='%s', maybe redirect to other domain?", domain, response.url) + message = f"Redirect from domain='{domain}' to response.url='{response.url}'" + instances.set_last_error(domain, message) + raise requests.exceptions.TooManyRedirects(message) - logger.debug(f"reqheaders()={len(reqheaders)} - EXIT!") + logger.debug("reqheaders()=%d - EXIT!", len(reqheaders)) return reqheaders