1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
19 from urllib.parse import urlparse
25 from fba.helpers import blacklist
26 from fba.helpers import cookies
27 from fba.helpers import tidyup
29 from fba.http import federation
30 from fba.http import network
32 from fba.models import instances
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
37 ##### Other functions #####
39 def is_primitive(var: any) -> bool:
40 logger.debug(f"var[]='{type(var)}' - CALLED!")
41 return type(var) in {int, str, float, bool} or var is None
43 def get_hash(domain: str) -> str:
44 logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
45 if not isinstance(domain, str):
46 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
48 raise ValueError("Parameter 'domain' is empty")
49 elif domain.lower() != domain:
50 raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
51 elif not validators.domain(domain.split("/")[0]):
52 raise ValueError(f"domain='{domain}' is not a valid domain")
53 elif domain.endswith(".arpa"):
54 raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
55 elif domain.endswith(".tld"):
56 raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
58 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
60 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
61 logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
62 if not isinstance(url, str):
63 raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
65 raise ValueError("Parameter 'url' is empty")
66 elif not isinstance(headers, dict):
67 raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
68 elif not isinstance(timeout, tuple):
69 raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
71 logger.debug(f"Parsing url='{url}'")
72 components = urlparse(url)
74 # Invoke other function, avoid trailing ?
75 logger.debug(f"components[{type(components)}]={components}")
76 if components.query != "":
77 response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
79 response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
81 logger.debug(f"response[]='{type(response)}' - EXXIT!")
84 def process_domain(domain: str, blocker: str, command: str) -> bool:
85 logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
86 if not isinstance(domain, str):
87 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
89 raise ValueError("Parameter 'domain' is empty")
90 elif domain.lower() != domain:
91 raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
92 elif not validators.domain(domain.split("/")[0]):
93 raise ValueError(f"domain='{domain}' is not a valid domain")
94 elif domain.endswith(".arpa"):
95 raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
96 elif domain.endswith(".tld"):
97 raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
98 elif not isinstance(blocker, str):
99 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
101 raise ValueError("Parameter 'blocker' is empty")
102 elif not validators.domain(blocker.split("/")[0]):
103 raise ValueError(f"blocker='{blocker}' is not a valid domain")
104 elif blocker.endswith(".arpa"):
105 raise ValueError(f"blocker='{blocker}' is a domain for reversed IP addresses, please don't crawl them!")
106 elif blocker.endswith(".tld"):
107 raise ValueError(f"blocker='{blocker}' is a fake domain, please don't crawl them!")
108 elif not isinstance(command, str):
109 raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
111 raise ValueError("Parameter 'command' is empty")
113 if domain.find("*") > 0:
114 # Try to de-obscure it
115 row = instances.deobscure("*", domain)
117 logger.debug(f"row[{type(row)}]='{row}'")
119 logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
122 logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
124 elif domain.find("?") > 0:
125 # Try to de-obscure it
126 row = instances.deobscure("?", domain)
128 logger.debug(f"row[{type(row)}]='{row}'")
130 logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
133 logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
136 if not is_domain_wanted(domain):
137 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
139 elif instances.is_recent(domain):
140 logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
145 logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
146 federation.fetch_instances(domain, blocker, None, command)
149 logger.debug("Invoking cookies.clear(%s) ...", domain)
150 cookies.clear(domain)
151 except network.exceptions as exception:
152 logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
153 instances.set_last_error(domain, exception)
155 logger.debug(f"processed='{processed}' - EXIT!")
158 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
159 logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
160 if not isinstance(tags, bs4.element.ResultSet):
161 raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
162 elif not isinstance(search, str):
163 raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
165 raise ValueError("Parameter 'search' is empty")
169 logger.debug("tag[]='%s'", type(tag))
170 domain = tidyup.domain(tag.find(search).contents[0])
172 logger.debug("domain='%s'", domain)
174 logger.debug("tag='%s' has no domain, trying <em> ...", tag)
175 domain = tidyup.domain(tag.find("em").contents[0])
177 if not is_domain_wanted(domain):
178 logger.debug("domain='%s' is not wanted - SKIPPED!")
181 logger.debug("Appending domain='%s'", domain)
182 domains.append(domain)
184 logger.debug("domains()=%d - EXIT!", len(domains))
187 def is_domain_wanted (domain: str) -> bool:
188 logger.debug("domain='%s' - CALLED!", domain)
191 if not isinstance(domain, str):
192 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
194 raise ValueError("Parameter 'domain' is empty")
195 elif domain.lower() != domain:
196 raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
197 elif not validators.domain(domain.split("/")[0]):
198 logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
200 elif domain.endswith(".arpa"):
201 logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
203 elif domain.endswith(".tld"):
204 logger.debug("domain='%s' is a fake domain - settings False ...", domain)
206 elif blacklist.is_blacklisted(domain):
207 logger.debug("domain='%s' is blacklisted - settings False ...", domain)
210 logger.debug("wanted='%s' - EXIT!", wanted)