1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
19 from urllib.parse import urlparse
25 from fba.helpers import blacklist
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
29 from fba.http import federation
30 from fba.http import network
32 from fba.models import blocks
33 from fba.models import instances
35 logging.basicConfig(level=logging.INFO)
36 logger = logging.getLogger(__name__)
38 ##### Other functions #####
40 def is_primitive(var: any) -> bool:
41 logger.debug("var[]='%s' - CALLED!", type(var))
42 return type(var) in {int, str, float, bool, None} or var is None
44 def get_hash(domain: str) -> str:
45 logger.debug("domain='%s' - CALLED!", domain)
46 domain_helper.raise_on(domain)
48 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
50 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
51 logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
52 if not isinstance(url, str):
53 raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
55 raise ValueError("Parameter 'url' is empty")
56 elif not isinstance(headers, dict):
57 raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
58 elif not isinstance(timeout, tuple):
59 raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
61 logger.debug("Parsing url='%s' ...", url)
62 components = urlparse(url)
64 # Invoke other function, avoid trailing ?
65 logger.debug("components[%s]='%s'", type(components), components)
66 if components.query != "":
67 response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
69 response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
71 logger.debug("response[]='%s' - EXIT!", type(response))
74 def process_domain(domain: str, blocker: str, command: str) -> bool:
75 logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
76 domain_helper.raise_on(domain)
77 domain_helper.raise_on(blocker)
79 if not isinstance(command, str):
80 raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
82 raise ValueError("Parameter 'command' is empty")
84 logger.debug("domain='%s' - BEFORE!", domain)
85 domain = deobfuscate_domain(domain, blocker)
87 logger.debug("domain='%s' - DEOBFUSCATED!", domain)
88 if instances.has_pending(blocker):
89 logger.debug("Flushing updates for blocker='%s' ...", blocker)
90 instances.update_data(blocker)
92 if not is_domain_wanted(domain):
93 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
95 elif instances.is_recent(domain):
96 logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
101 logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
102 federation.fetch_instances(domain, blocker, None, command)
104 except network.exceptions as exception:
105 logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
106 instances.set_last_error(domain, exception)
108 logger.debug("Checking if domain='%s' has pending updates ...", domain)
109 if instances.has_pending(domain):
110 logger.debug("Flushing updates for domain='%s' ...", domain)
111 instances.update_data(domain)
113 logger.debug("processed='%s' - EXIT!", processed)
116 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
117 logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
118 if not isinstance(tags, bs4.element.ResultSet):
119 raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
120 elif not isinstance(search, str):
121 raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
123 raise ValueError("Parameter 'search' is empty")
126 logger.debug("Parsing %d tags ...", len(tags))
128 logger.debug("tag[]='%s'", type(tag))
129 domain = tidyup.domain(tag.find(search).contents[0])
131 logger.debug("domain='%s'", domain)
133 logger.debug("tag='%s' has no domain, trying <em> ...", tag)
134 domain = tidyup.domain(tag.find("em").contents[0])
136 if not is_domain_wanted(domain):
137 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
140 logger.debug("Appending domain='%s'", domain)
141 domains.append(domain)
143 logger.debug("domains()=%d - EXIT!", len(domains))
146 def is_domain_wanted(domain: str) -> bool:
147 logger.debug("domain='%s' - CALLED!", domain)
150 if not isinstance(domain, str):
151 raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
153 raise ValueError("Parameter 'domain' is empty")
154 elif domain.lower() != domain:
156 elif not validators.domain(domain.split("/")[0]):
157 logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
159 elif domain.endswith(".arpa"):
160 logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
162 elif domain.endswith(".onion"):
163 logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain)
165 elif domain.endswith(".tld"):
166 logger.debug("domain='%s' is a fake domain - settings False ...", domain)
168 elif blacklist.is_blacklisted(domain):
169 logger.debug("domain='%s' is blacklisted - settings False ...", domain)
171 elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
172 logger.debug("domain='%s' is a single user", domain)
174 elif domain.find("/tag/") > 0:
175 logger.debug("domain='%s' is a tag", domain)
178 logger.debug("wanted='%s' - EXIT!", wanted)
181 def deobfuscate_domain(domain: str, blocker: str, domain_hash: str = None) -> str:
182 logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
183 domain_helper.raise_on(blocker)
185 if not isinstance(domain, str):
186 raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
188 raise ValueError("Parameter domain is empty")
189 elif not isinstance(domain_hash, str) and domain_hash is not None:
190 raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
192 if domain.find("*") >= 0:
193 logger.debug("blocker='%s' uses obfuscated domains", blocker)
195 # Obscured domain name with no hash
196 row = instances.deobfuscate("*", domain, domain_hash)
198 logger.debug("row[]='%s'", type(row))
200 logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
201 domain = row["domain"]
203 logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
204 instances.set_has_obfuscation(blocker, True)
205 elif domain.find("?") >= 0:
206 logger.debug("blocker='%s' uses obfuscated domains", blocker)
208 # Obscured domain name with no hash
209 row = instances.deobfuscate("?", domain, domain_hash)
211 logger.debug("row[]='%s'", type(row))
213 logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
214 domain = row["domain"]
216 logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
217 instances.set_has_obfuscation(blocker, True)
219 logger.debug("domain='%s' is not obfuscated", domain)
221 logger.debug("domain='%s' - EXIT!", domain)
224 def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
225 logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
226 domain_helper.raise_on(blocker)
227 domain_helper.raise_on(blocked)
230 if not isinstance(reason, str) and reason is not None:
231 raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
232 elif not isinstance(block_level, str):
233 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
234 elif block_level == "":
235 raise ValueError("Parameter block_level is empty")
237 if not blocks.is_instance_blocked(blocker, blocked, block_level):
238 logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
239 blocks.add_instance(blocker, blocked, reason, block_level)
242 logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
243 blocks.update_last_seen(blocker, blocked, block_level)
245 logger.debug("added='%s' - EXIT!", added)
248 def alias_block_level(block_level: str) -> str:
249 logger.debug("block_level='%s' - CALLED!", block_level)
250 if not isinstance(block_level, str):
251 raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
252 elif block_level == "":
253 raise ValueError("Parameter 'block_level' is empty")
255 if block_level == "silence":
256 logger.debug("Block level 'silence' has been changed to 'silenced'")
257 block_level = "silenced"
258 elif block_level == "suspend":
259 logger.debug("Block level 'suspend' has been changed to 'suspended'")
260 block_level = "suspended"
261 elif block_level == "nsfw":
262 logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'")
263 block_level = "media_nsfw"
265 logger.debug("block_level='%s' - EXIT!", block_level)