1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
19 from urllib.parse import urlparse
24 from fba.helpers import config
25 from fba.helpers import domain as domain_helper
26 from fba.helpers import tidyup
28 from fba.http import network
30 from fba.models import instances
32 logging.basicConfig(level=logging.INFO)
33 logger = logging.getLogger(__name__)
35 ##### Other functions #####
37 def is_primitive(var: any) -> bool:
38 logger.debug("var[]='%s' - CALLED!", type(var))
39 return type(var) in {int, str, float, bool, None} or var is None
41 def get_hash(domain: str) -> str:
42 logger.debug("domain='%s' - CALLED!", domain)
43 domain_helper.raise_on(domain)
45 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
47 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
48 logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
49 if not isinstance(url, str):
50 raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
52 raise ValueError("Parameter 'url' is empty")
53 elif not isinstance(headers, dict):
54 raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
55 elif not isinstance(timeout, tuple):
56 raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
58 logger.debug("Parsing url='%s' ...", url)
59 components = urlparse(url)
61 # Invoke other function, avoid trailing ?
62 logger.debug("components[%s]='%s'", type(components), components)
63 if components.query != "":
64 response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
66 response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
68 logger.debug("response[]='%s' - EXIT!", type(response))
71 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
72 logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
73 if not isinstance(tags, bs4.element.ResultSet):
74 raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
75 elif not isinstance(search, str):
76 raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
78 raise ValueError("Parameter 'search' is empty")
81 logger.debug("Parsing %d tags ...", len(tags))
83 logger.debug("tag[]='%s'", type(tag))
84 domain = tidyup.domain(tag.find(search).contents[0])
85 logger.debug("domain='%s' - AFTER!", domain)
88 logger.debug("tag='%s' has no domain, trying <em> ...", tag)
89 domain = tidyup.domain(tag.find("em").contents[0])
90 logger.debug("domain='%s' - AFTER!", domain)
93 logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
96 logger.debug("domain='%s' - BEFORE!", domain)
97 domain = domain.encode("idna").decode("utf-8")
98 logger.debug("domain='%s' - AFTER!", domain)
100 if not domain_helper.is_wanted(domain):
101 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
104 logger.debug("Appending domain='%s'", domain)
105 domains.append(domain)
107 logger.debug("domains()=%d - EXIT!", len(domains))
110 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
111 logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
112 domain_helper.raise_on(blocker)
114 if not isinstance(domain, str):
115 raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
117 raise ValueError("Parameter domain is empty")
118 elif not isinstance(domain_hash, str) and domain_hash is not None:
119 raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
121 if domain.find("*") >= 0:
122 logger.debug("blocker='%s' uses obfuscated domains", blocker)
124 # Obscured domain name with no hash
125 row = instances.deobfuscate("*", domain, domain_hash)
127 logger.debug("row[]='%s'", type(row))
129 logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
130 domain = row["domain"]
132 logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
133 instances.set_has_obfuscation(blocker, True)
134 elif domain.find("?") >= 0:
135 logger.debug("blocker='%s' uses obfuscated domains", blocker)
137 # Obscured domain name with no hash
138 row = instances.deobfuscate("?", domain, domain_hash)
140 logger.debug("row[]='%s'", type(row))
142 logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
143 domain = row["domain"]
145 logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
146 instances.set_has_obfuscation(blocker, True)
148 logger.debug("domain='%s' is not obfuscated", domain)
150 logger.debug("domain='%s' - EXIT!", domain)
153 def base_url() -> str:
154 return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"