import bs4
import reqto
-import validators
+import requests
from fba.helpers import config
from fba.helpers import cookies
+from fba.helpers import domain as domain_helper
from fba.http import network
+from fba.models import instances
+
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def determine(domain: str, headers: dict) -> dict:
- logger.debug(f"domain='{domain}',headers()={len(headers)} - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif domain.lower() != domain:
- raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
- elif not validators.domain(domain.split("/")[0]):
- raise ValueError(f"domain='{domain}' is not a valid domain")
- elif domain.endswith(".arpa"):
- raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
- elif domain.endswith(".tld"):
- raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
- elif not isinstance(headers, dict):
- raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
+ logger.debug("domain='%s',headers()=%d - CALLED!", domain, len(headers))
+ domain_helper.raise_on(domain)
+
+ if not isinstance(headers, dict):
+ raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
# Default headers with no CSRF
reqheaders = headers
# Fetch / to check for meta tag indicating csrf
- logger.debug(f"Fetching / from domain='{domain}' for CSRF check ...")
+ logger.debug("Fetching / from domain='%s' for CSRF check ...", domain)
response = reqto.get(
f"https://{domain}/",
headers=network.web_headers,
)
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
- if response.ok and response.status_code < 300 and response.text != "" and response.text.find("<html") > 0:
+ if response.ok and response.status_code < 300 and response.text.strip() != "" and response.text.find("<html") > 0 and domain_helper.is_in_url(domain, response.url):
# Save cookies
- logger.debug(f"Parsing response.text()={len(response.text)} Bytes ...")
+ logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
cookies.store(domain, response.cookies.get_dict())
# Parse text
response.text,
"html.parser"
)
- logger.debug(f"meta[]='{type(meta)}'")
+ logger.debug("meta[]='%s'", type(meta))
tag = meta.find("meta", attrs={"name": "csrf-token"})
- logger.debug(f"tag={tag}")
+ logger.debug("tag[%s]='%s'", type(tag), tag)
if tag is not None:
- logger.debug(f"Adding CSRF token='{tag['content']}' for domain='{domain}'")
+ logger.debug("Adding CSRF token='%s' for domain='%s'", tag["content"], domain)
reqheaders["X-CSRF-Token"] = tag["content"]
+ elif not domain_helper.is_in_url(domain, response.url):
+ logger.warning("domain='%s' doesn't match with response.url='%s', maybe redirect to other domain?", domain, response.url)
+ message = f"Redirect from domain='{domain}' to response.url='{response.url}'"
+ instances.set_last_error(domain, message)
+ raise requests.exceptions.TooManyRedirects(message)
- logger.debug(f"reqheaders()={len(reqheaders)} - EXIT!")
+ logger.debug("reqheaders()=%d - EXIT!", len(reqheaders))
return reqheaders