1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
19 from urllib.parse import urlparse
25 from fba.helpers import blacklist
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
29 from fba.http import federation
30 from fba.http import network
32 from fba.models import instances
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
37 ##### Other functions #####
39 def is_primitive(var: any) -> bool:
40 logger.debug("var[]='%s' - CALLED!", type(var))
41 return type(var) in {int, str, float, bool} or var is None
43 def get_hash(domain: str) -> str:
44 logger.debug("domain='%s' - CALLED!", domain)
45 domain_helper.raise_on(domain)
47 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
50 logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
51 if not isinstance(url, str):
52 raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
54 raise ValueError("Parameter 'url' is empty")
55 elif not isinstance(headers, dict):
56 raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
57 elif not isinstance(timeout, tuple):
58 raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
60 logger.debug("Parsing url='%s' ...", url)
61 components = urlparse(url)
63 # Invoke other function, avoid trailing ?
64 logger.debug("components[%s]='%s'", type(components), components)
65 if components.query != "":
66 response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
68 response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
70 logger.debug("response[]='%s' - EXIT!", type(response))
73 def process_domain(domain: str, blocker: str, command: str) -> bool:
74 logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
75 domain_helper.raise_on(domain)
76 domain_helper.raise_on(blocker)
78 if not isinstance(command, str):
79 raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
81 raise ValueError("Parameter 'command' is empty")
83 logger.debug("domain='%s' - BEFORE!")
84 if domain.find("*") > 0:
85 logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
86 instances.set_has_obfuscation(blocker, True)
88 # Try to de-obscure it
89 row = instances.deobfuscate("*", domain)
91 logger.debug("row[%s]='%s'", type(row), row)
93 logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
96 logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
98 elif domain.find("?") > 0:
99 logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
100 instances.set_has_obfuscation(blocker, True)
102 # Try to de-obscure it
103 row = instances.deobfuscate("?", domain)
105 logger.debug("row[%s]='%s'", type(row), row)
107 logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
110 logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
113 logger.debug("blocker='%s' has NO obfuscation on their block list", blocker)
114 instances.set_has_obfuscation(blocker, False)
116 logger.debug("domain='%s' - DEOBFUSCATED!", domain)
117 if instances.has_pending(blocker):
118 logger.debug("Flushing updates for blocker='%s' ...", blocker)
119 instances.update_data(blocker)
121 if not is_domain_wanted(domain):
122 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
124 elif instances.is_recent(domain):
125 logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
130 logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
131 federation.fetch_instances(domain, blocker, None, command)
133 except network.exceptions as exception:
134 logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
135 instances.set_last_error(domain, exception)
137 logger.debug("Checking if domain='%s' has pending updates ...")
138 if instances.has_pending(domain):
139 logger.debug("Flushing updates for domain='%s' ...")
140 instances.update_data(domain)
142 logger.debug("processed='%s' - EXIT!", processed)
145 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
146 logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
147 if not isinstance(tags, bs4.element.ResultSet):
148 raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
149 elif not isinstance(search, str):
150 raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
152 raise ValueError("Parameter 'search' is empty")
155 logger.debug("Parsing %d tags ...", len(tags))
157 logger.debug("tag[]='%s'", type(tag))
158 domain = tidyup.domain(tag.find(search).contents[0])
160 logger.debug("domain='%s'", domain)
162 logger.debug("tag='%s' has no domain, trying <em> ...", tag)
163 domain = tidyup.domain(tag.find("em").contents[0])
165 if not is_domain_wanted(domain):
166 logger.debug("domain='%s' is not wanted - SKIPPED!")
169 logger.debug("Appending domain='%s'", domain)
170 domains.append(domain)
172 logger.debug("domains()=%d - EXIT!", len(domains))
175 def is_domain_wanted(domain: str) -> bool:
176 logger.debug("domain='%s' - CALLED!", domain)
179 if not isinstance(domain, str):
180 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
182 raise ValueError("Parameter 'domain' is empty")
183 elif domain.lower() != domain:
185 elif not validators.domain(domain.split("/")[0]):
186 logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
188 elif domain.endswith(".arpa"):
189 logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
191 elif domain.endswith(".tld"):
192 logger.debug("domain='%s' is a fake domain - settings False ...", domain)
194 elif blacklist.is_blacklisted(domain):
195 logger.debug("domain='%s' is blacklisted - settings False ...", domain)
197 elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
198 logger.debug("domain='%s' is a single user", domain)
200 elif domain.find("/tag/") > 0:
201 logger.debug("domain='%s' is a tag", domain)
204 logger.debug("wanted='%s' - EXIT!", wanted)
207 def deobfuscate_domain(domain: str, blocker: str) -> str:
208 logger.debug("domain='%s',blocker='%s' - CALLED!", domain, blocker)
209 domain_helper.raise_on(domain)
210 domain_helper.raise_on(blocker)
212 if domain.count("*") > 0:
213 logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
214 instances.set_has_obfuscation(blocker, True)
216 # Obscured domain name with no hash
217 row = instances.deobfuscate("*", domain)
219 logger.debug("row[]='%s'", type(row))
221 logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
223 elif domain.count("?") > 0:
224 logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
225 instances.set_has_obfuscation(blocker, True)
227 # Obscured domain name with no hash
228 row = instances.deobfuscate("?", domain)
230 logger.debug("row[]='%s'", type(row))
232 logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
235 logger.debug("domain='%s' is not obfuscated", domain)
237 logger.debug("domain='%s' - EXIT!", domain)