]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
28
29 from fba.http import federation
30 from fba.http import network
31
32 from fba.models import blocks
33 from fba.models import instances
34
35 logging.basicConfig(level=logging.INFO)
36 logger = logging.getLogger(__name__)
37
38 ##### Other functions #####
39
40 def is_primitive(var: any) -> bool:
41     logger.debug("var[]='%s' - CALLED!", type(var))
42     return type(var) in {int, str, float, bool, None} or var is None
43
44 def get_hash(domain: str) -> str:
45     logger.debug("domain='%s' - CALLED!", domain)
46     domain_helper.raise_on(domain)
47
48     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49
50 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
51     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not isinstance(headers, dict):
57         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
58     elif not isinstance(timeout, tuple):
59         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
60
61     logger.debug("Parsing url='%s' ...", url)
62     components = urlparse(url)
63
64     # Invoke other function, avoid trailing ?
65     logger.debug("components[%s]='%s'", type(components), components)
66     if components.query != "":
67         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
68     else:
69         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
70
71     logger.debug("response[]='%s' - EXIT!", type(response))
72     return response
73
74 def process_domain(domain: str, blocker: str, command: str) -> bool:
75     logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
76     domain_helper.raise_on(domain)
77     domain_helper.raise_on(blocker)
78
79     if not isinstance(command, str):
80         raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
81     elif command == "":
82         raise ValueError("Parameter 'command' is empty")
83
84     logger.debug("domain='%s' - BEFORE!", domain)
85     domain = deobfuscate_domain(domain, blocker)
86
87     logger.debug("domain='%s' - DEOBFUSCATED!", domain)
88     if instances.has_pending(blocker):
89         logger.debug("Flushing updates for blocker='%s' ...", blocker)
90         instances.update_data(blocker)
91
92     if not is_domain_wanted(domain):
93         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
94         return False
95     elif instances.is_recent(domain):
96         logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
97         return False
98
99     processed = False
100     try:
101         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
102         federation.fetch_instances(domain, blocker, None, command)
103         processed = True
104     except network.exceptions as exception:
105         logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
106         instances.set_last_error(domain, exception)
107
108     logger.debug("Checking if domain='%s' has pending updates ...", domain)
109     if instances.has_pending(domain):
110         logger.debug("Flushing updates for domain='%s' ...", domain)
111         instances.update_data(domain)
112
113     logger.debug("processed='%s' - EXIT!", processed)
114     return processed
115
116 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
117     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
118     if not isinstance(tags, bs4.element.ResultSet):
119         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
120     elif not isinstance(search, str):
121         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
122     elif search == "":
123         raise ValueError("Parameter 'search' is empty")
124
125     domains = list()
126     logger.debug("Parsing %d tags ...", len(tags))
127     for tag in tags:
128         logger.debug("tag[]='%s'", type(tag))
129         domain = tidyup.domain(tag.find(search).contents[0])
130
131         logger.debug("domain='%s'", domain)
132         if domain == "":
133             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
134             domain = tidyup.domain(tag.find("em").contents[0])
135
136         if not is_domain_wanted(domain):
137             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
138             continue
139
140         logger.debug("Appending domain='%s'", domain)
141         domains.append(domain)
142
143     logger.debug("domains()=%d - EXIT!", len(domains))
144     return domains
145
146 def is_domain_wanted(domain: str) -> bool:
147     logger.debug("domain='%s' - CALLED!", domain)
148
149     wanted = True
150     if not isinstance(domain, str):
151         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
152     elif domain == "":
153         raise ValueError("Parameter 'domain' is empty")
154     elif domain.lower() != domain:
155         wanted = False
156     elif not validators.domain(domain.split("/")[0]):
157         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
158         wanted = False
159     elif domain.endswith(".arpa"):
160         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
161         wanted = False
162     elif domain.endswith(".onion"):
163         logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain)
164         wanted = False
165     elif domain.endswith(".tld"):
166         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
167         wanted = False
168     elif blacklist.is_blacklisted(domain):
169         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
170         wanted = False
171     elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
172         logger.debug("domain='%s' is a single user", domain)
173         wanted = False
174     elif domain.find("/tag/") > 0:
175         logger.debug("domain='%s' is a tag", domain)
176         wanted = False
177
178     logger.debug("wanted='%s' - EXIT!", wanted)
179     return wanted
180
181 def deobfuscate_domain(domain: str, blocker: str, domain_hash: str = None) -> str:
182     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
183     domain_helper.raise_on(blocker)
184
185     if not isinstance(domain, str):
186         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
187     elif domain == "":
188         raise ValueError("Parameter domain is empty")
189     elif not isinstance(domain_hash, str) and domain_hash is not None:
190         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
191
192     if domain.find("*") >= 0:
193         logger.debug("blocker='%s' uses obfuscated domains", blocker)
194
195         # Obscured domain name with no hash
196         row = instances.deobfuscate("*", domain, domain_hash)
197
198         logger.debug("row[]='%s'", type(row))
199         if row is not None:
200             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
201             domain = row["domain"]
202         else:
203             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
204             instances.set_has_obfuscation(blocker, True)
205     elif domain.find("?") >= 0:
206         logger.debug("blocker='%s' uses obfuscated domains", blocker)
207
208         # Obscured domain name with no hash
209         row = instances.deobfuscate("?", domain, domain_hash)
210
211         logger.debug("row[]='%s'", type(row))
212         if row is not None:
213             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
214             domain = row["domain"]
215         else:
216             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
217             instances.set_has_obfuscation(blocker, True)
218     else:
219         logger.debug("domain='%s' is not obfuscated", domain)
220
221     logger.debug("domain='%s' - EXIT!", domain)
222     return domain
223
224 def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
225     logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
226     domain_helper.raise_on(blocker)
227     domain_helper.raise_on(blocked)
228
229     added = False
230     if not isinstance(reason, str) and reason is not None:
231         raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
232     elif not isinstance(block_level, str):
233         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
234     elif block_level == "":
235         raise ValueError("Parameter block_level is empty")
236
237     if not blocks.is_instance_blocked(blocker, blocked, block_level):
238         logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
239         blocks.add_instance(blocker, blocked, reason, block_level)
240         added = True
241     else:
242         logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
243         blocks.update_last_seen(blocker, blocked, block_level)
244
245     logger.debug("added='%s' - EXIT!", added)
246     return added
247
248 def alias_block_level(block_level: str) -> str:
249     logger.debug("block_level='%s' - CALLED!", block_level)
250     if not isinstance(block_level, str):
251         raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
252     elif block_level == "":
253         raise ValueError("Parameter 'block_level' is empty")
254
255     if block_level == "silence":
256         logger.debug("Block level 'silence' has been changed to 'silenced'")
257         block_level = "silenced"
258     elif block_level == "suspend":
259         logger.debug("Block level 'suspend' has been changed to 'suspended'")
260         block_level = "suspended"
261     elif block_level == "nsfw":
262         logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'")
263         block_level = "media_nsfw"
264
265     logger.debug("block_level='%s' - EXIT!", block_level)
266     return block_level