-# Copyright (C) 2023 Free Software Foundation
+ # Copyright (C) 2023 Free Software Foundation
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
import bs4
import requests
-import validators
-from fba.helpers import blacklist
-from fba.helpers import cookies
+from fba.helpers import config
+from fba.helpers import domain as domain_helper
from fba.helpers import tidyup
-from fba.http import federation
from fba.http import network
from fba.models import instances
##### Other functions #####
def is_primitive(var: any) -> bool:
- logger.debug(f"var[]='{type(var)}' - CALLED!")
- return type(var) in {int, str, float, bool} or var is None
+ logger.debug("var[]='%s' - CALLED!", type(var))
+ return type(var) in {int, str, float, bool, None} or var is None
def get_hash(domain: str) -> str:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif domain.lower() != domain:
- raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
- elif not validators.domain(domain.split("/")[0]):
- raise ValueError(f"domain='{domain}' is not a valid domain")
- elif domain.endswith(".arpa"):
- raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
- elif domain.endswith(".tld"):
- raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+ logger.debug("domain='%s' - CALLED!", domain)
+ domain_helper.raise_on(domain)
return hashlib.sha256(domain.encode("utf-8")).hexdigest()
def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
- logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
+ logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
+
if not isinstance(url, str):
- raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
+ raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
elif url == "":
raise ValueError("Parameter 'url' is empty")
elif not isinstance(headers, dict):
- raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
+ raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
elif not isinstance(timeout, tuple):
- raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
+ raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
- logger.debug(f"Parsing url='{url}'")
+ logger.debug("Parsing url='%s' ...", url)
components = urlparse(url)
# Invoke other function, avoid trailing ?
- logger.debug(f"components[{type(components)}]={components}")
+ logger.debug("components[%s]='%s'", type(components), components)
if components.query != "":
- response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
+ response = network.fetch_response(components.netloc.split(":")[0], f"{components.path}?{components.query}", headers, timeout)
else:
- response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
+ response = network.fetch_response(components.netloc.split(":")[0], components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
- logger.debug(f"response[]='{type(response)}' - EXXIT!")
+ logger.debug("response[]='%s' - EXIT!", type(response))
return response
-def process_domain(domain: str, blocker: str, command: str) -> bool:
- logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif domain.lower() != domain:
- raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
- elif not validators.domain(domain.split("/")[0]):
- raise ValueError(f"domain='{domain}' is not a valid domain")
- elif domain.endswith(".arpa"):
- raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
- elif domain.endswith(".tld"):
- raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
- elif not isinstance(blocker, str):
- raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
- elif blocker == "":
- raise ValueError("Parameter 'blocker' is empty")
- elif not validators.domain(blocker.split("/")[0]):
- raise ValueError(f"blocker='{blocker}' is not a valid domain")
- elif blocker.endswith(".arpa"):
- raise ValueError(f"blocker='{blocker}' is a domain for reversed IP addresses, please don't crawl them!")
- elif blocker.endswith(".tld"):
- raise ValueError(f"blocker='{blocker}' is a fake domain, please don't crawl them!")
- elif not isinstance(command, str):
- raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
- elif command == "":
- raise ValueError("Parameter 'command' is empty")
-
- if domain.find("*") > 0:
- # Try to de-obscure it
- row = instances.deobscure("*", domain)
-
- logger.debug(f"row[{type(row)}]='{row}'")
- if row is None:
- logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
- return False
-
- logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
- domain = row[0]
- elif domain.find("?") > 0:
- # Try to de-obscure it
- row = instances.deobscure("?", domain)
-
- logger.debug(f"row[{type(row)}]='{row}'")
- if row is None:
- logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
- return False
-
- logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
- domain = row[0]
-
- if not is_domain_wanted(domain):
- logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
- return False
- elif instances.is_recent(domain):
- logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
- return False
-
- processed = False
- try:
- logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
- federation.fetch_instances(domain, blocker, None, command)
- processed = True
-
- logger.debug("Invoking cookies.clear(%s) ...", domain)
- cookies.clear(domain)
- except network.exceptions as exception:
- logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
- instances.set_last_error(domain, exception)
-
- logger.debug(f"processed='{processed}' - EXIT!")
- return processed
-
def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
+
if not isinstance(tags, bs4.element.ResultSet):
- raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
+ raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
elif not isinstance(search, str):
- raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
+ raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
elif search == "":
raise ValueError("Parameter 'search' is empty")
domains = list()
+ logger.debug("Parsing %d tags ...", len(tags))
for tag in tags:
logger.debug("tag[]='%s'", type(tag))
domain = tidyup.domain(tag.find(search).contents[0])
+ logger.debug("domain='%s' - AFTER!", domain)
- logger.debug("domain='%s'", domain)
if domain == "":
logger.debug("tag='%s' has no domain, trying <em> ...", tag)
domain = tidyup.domain(tag.find("em").contents[0])
+ logger.debug("domain='%s' - AFTER!", domain)
- if not is_domain_wanted(domain):
- logger.debug("domain='%s' is not wanted - SKIPPED!")
+ if domain == "":
+ logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
+ continue
+
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = domain.encode("idna").decode("utf-8")
+ logger.debug("domain='%s' - AFTER!", domain)
+
+ if not domain_helper.is_wanted(domain):
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
logger.debug("Appending domain='%s'", domain)
logger.debug("domains()=%d - EXIT!", len(domains))
return domains
-def is_domain_wanted (domain: str) -> bool:
- logger.debug("domain='%s' - CALLED!", domain)
- wanted = True
+def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
+ logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
+ domain_helper.raise_on(blocker)
if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif domain.lower() != domain:
- raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
- elif not validators.domain(domain.split("/")[0]):
- logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
- wanted = False
- elif domain.endswith(".arpa"):
- logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
- wanted = False
- elif domain.endswith(".tld"):
- logger.debug("domain='%s' is a fake domain - settings False ...", domain)
- wanted = False
- elif blacklist.is_blacklisted(domain):
- logger.debug("domain='%s' is blacklisted - settings False ...", domain)
- wanted = False
-
- logger.debug("wanted='%s' - EXIT!", wanted)
- return wanted
+ raise ValueError("Parameter domain is empty")
+ elif not isinstance(domain_hash, str) and domain_hash is not None:
+ raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
+
+ logger.debug("Setting has_obfuscation=False for blocker='%s' ...", blocker)
+ instances.set_has_obfuscation(blocker, False)
+
+ if domain.find("*") >= 0:
+ logger.debug("blocker='%s' uses obfuscated domains", blocker)
+ instances.set_has_obfuscation(blocker, True)
+
+ # Obscured domain name with no hash
+ row = instances.deobfuscate("*", domain, domain_hash)
+
+ logger.debug("row[]='%s'", type(row))
+ if row is not None:
+ logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
+ domain = row["domain"]
+ else:
+ logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
+ elif domain.find("?") >= 0:
+ logger.debug("blocker='%s' uses obfuscated domains", blocker)
+ instances.set_has_obfuscation(blocker, True)
+
+ # Obscured domain name with no hash
+ row = instances.deobfuscate("?", domain, domain_hash)
+
+ logger.debug("row[]='%s'", type(row))
+ if row is not None:
+ logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
+ domain = row["domain"]
+ else:
+ logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
+ else:
+ logger.debug("domain='%s' is not obfuscated", domain)
+
+ logger.debug("domain='%s' - EXIT!", domain)
+ return domain
+
+def base_url() -> str:
+ return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"