1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
19 from urllib.parse import urlparse
25 from fba.helpers import blacklist
26 from fba.helpers import cookies
27 from fba.helpers import domain as domain_helper
28 from fba.helpers import tidyup
30 from fba.http import federation
31 from fba.http import network
33 from fba.models import instances
35 logging.basicConfig(level=logging.INFO)
36 logger = logging.getLogger(__name__)
38 ##### Other functions #####
40 def is_primitive(var: any) -> bool:
41 logger.debug(f"var[]='{type(var)}' - CALLED!")
42 return type(var) in {int, str, float, bool} or var is None
44 def get_hash(domain: str) -> str:
45 logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
46 domain_helper.raise_on(domain)
48 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
50 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
51 logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
52 if not isinstance(url, str):
53 raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
55 raise ValueError("Parameter 'url' is empty")
56 elif not isinstance(headers, dict):
57 raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
58 elif not isinstance(timeout, tuple):
59 raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
61 logger.debug(f"Parsing url='{url}'")
62 components = urlparse(url)
64 # Invoke other function, avoid trailing ?
65 logger.debug(f"components[{type(components)}]={components}")
66 if components.query != "":
67 response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
69 response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
71 logger.debug(f"response[]='{type(response)}' - EXXIT!")
74 def process_domain(domain: str, blocker: str, command: str) -> bool:
75 logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
76 domain_helper.raise_on(domain)
77 domain_helper.raise_on(blocker)
78 if not isinstance(command, str):
79 raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
81 raise ValueError("Parameter 'command' is empty")
83 if domain.find("*") > 0:
84 # Try to de-obscure it
85 row = instances.deobscure("*", domain)
87 logger.debug(f"row[{type(row)}]='{row}'")
89 logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
92 logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
94 elif domain.find("?") > 0:
95 # Try to de-obscure it
96 row = instances.deobscure("?", domain)
98 logger.debug(f"row[{type(row)}]='{row}'")
100 logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
103 logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
106 if not is_domain_wanted(domain):
107 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
109 elif instances.is_recent(domain):
110 logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
115 logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
116 federation.fetch_instances(domain, blocker, None, command)
119 logger.debug("Invoking cookies.clear(%s) ...", domain)
120 cookies.clear(domain)
121 except network.exceptions as exception:
122 logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
123 instances.set_last_error(domain, exception)
125 logger.debug(f"processed='{processed}' - EXIT!")
128 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
129 logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
130 if not isinstance(tags, bs4.element.ResultSet):
131 raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
132 elif not isinstance(search, str):
133 raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
135 raise ValueError("Parameter 'search' is empty")
139 logger.debug("tag[]='%s'", type(tag))
140 domain = tidyup.domain(tag.find(search).contents[0])
142 logger.debug("domain='%s'", domain)
144 logger.debug("tag='%s' has no domain, trying <em> ...", tag)
145 domain = tidyup.domain(tag.find("em").contents[0])
147 if not is_domain_wanted(domain):
148 logger.debug("domain='%s' is not wanted - SKIPPED!")
151 logger.debug("Appending domain='%s'", domain)
152 domains.append(domain)
154 logger.debug("domains()=%d - EXIT!", len(domains))
157 def is_domain_wanted (domain: str) -> bool:
158 logger.debug("domain='%s' - CALLED!", domain)
161 if not isinstance(domain, str):
162 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
164 raise ValueError("Parameter 'domain' is empty")
165 elif domain.lower() != domain:
167 elif not validators.domain(domain.split("/")[0]):
168 logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
170 elif domain.endswith(".arpa"):
171 logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
173 elif domain.endswith(".tld"):
174 logger.debug("domain='%s' is a fake domain - settings False ...", domain)
176 elif blacklist.is_blacklisted(domain):
177 logger.debug("domain='%s' is blacklisted - settings False ...", domain)
180 logger.debug("wanted='%s' - EXIT!", wanted)