]> git.mxchange.org Git - fba.git/blob - fba/utils.py
600763558b8b9943aa4397c32c3345ae2863a379
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
28
29 from fba.http import federation
30 from fba.http import network
31
32 from fba.models import blocks
33 from fba.models import instances
34
35 logging.basicConfig(level=logging.INFO)
36 logger = logging.getLogger(__name__)
37
38 ##### Other functions #####
39
40 def is_primitive(var: any) -> bool:
41     logger.debug("var[]='%s' - CALLED!", type(var))
42     return type(var) in {int, str, float, bool, None} or var is None
43
44 def get_hash(domain: str) -> str:
45     logger.debug("domain='%s' - CALLED!", domain)
46     domain_helper.raise_on(domain)
47
48     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49
50 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
51     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not isinstance(headers, dict):
57         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
58     elif not isinstance(timeout, tuple):
59         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
60
61     logger.debug("Parsing url='%s' ...", url)
62     components = urlparse(url)
63
64     # Invoke other function, avoid trailing ?
65     logger.debug("components[%s]='%s'", type(components), components)
66     if components.query != "":
67         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
68     else:
69         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
70
71     logger.debug("response[]='%s' - EXIT!", type(response))
72     return response
73
74 def process_domain(domain: str, blocker: str, command: str) -> bool:
75     logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
76     domain_helper.raise_on(domain)
77     domain_helper.raise_on(blocker)
78
79     if not isinstance(command, str):
80         raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
81     elif command == "":
82         raise ValueError("Parameter 'command' is empty")
83
84     logger.debug("domain='%s' - BEFORE!", domain)
85     domain = deobfuscate(domain, blocker)
86
87     logger.debug("domain='%s' - DEOBFUSCATED!", domain)
88     if instances.has_pending(blocker):
89         logger.debug("Flushing updates for blocker='%s' ...", blocker)
90         instances.update_data(blocker)
91
92     if not is_domain_wanted(domain):
93         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
94         return False
95     elif instances.is_recent(domain):
96         logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
97         return False
98
99     processed = False
100     try:
101         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
102         federation.fetch_instances(domain, blocker, None, command)
103         processed = True
104     except network.exceptions as exception:
105         logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
106         instances.set_last_error(domain, exception)
107
108     logger.debug("Checking if domain='%s' has pending updates ...", domain)
109     if instances.has_pending(domain):
110         logger.debug("Flushing updates for domain='%s' ...", domain)
111         instances.update_data(domain)
112
113     logger.debug("processed='%s' - EXIT!", processed)
114     return processed
115
116 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
117     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
118     if not isinstance(tags, bs4.element.ResultSet):
119         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
120     elif not isinstance(search, str):
121         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
122     elif search == "":
123         raise ValueError("Parameter 'search' is empty")
124
125     domains = list()
126     logger.debug("Parsing %d tags ...", len(tags))
127     for tag in tags:
128         logger.debug("tag[]='%s'", type(tag))
129         domain = tidyup.domain(tag.find(search).contents[0])
130         logger.debug("domain='%s' - AFTER!", domain)
131
132         if domain == "":
133             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
134             domain = tidyup.domain(tag.find("em").contents[0])
135             logger.debug("domain='%s' - AFTER!", domain)
136
137         if domain == "":
138             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
139             continue
140
141         logger.debug("domain='%s' - BEFORE!", domain)
142         domain = domain.encode("idna").decode("utf-8")
143         logger.debug("domain='%s' - AFTER!", domain)
144
145         if not is_domain_wanted(domain):
146             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
147             continue
148
149         logger.debug("Appending domain='%s'", domain)
150         domains.append(domain)
151
152     logger.debug("domains()=%d - EXIT!", len(domains))
153     return domains
154
155 def is_domain_wanted(domain: str) -> bool:
156     logger.debug("domain='%s' - CALLED!", domain)
157
158     wanted = True
159     if not isinstance(domain, str):
160         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
161     elif domain == "":
162         raise ValueError("Parameter 'domain' is empty")
163     elif domain.lower() != domain:
164         wanted = False
165     elif not validators.domain(domain.split("/")[0]):
166         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
167         wanted = False
168     elif domain.endswith(".arpa"):
169         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
170         wanted = False
171     elif domain.endswith(".onion"):
172         logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain)
173         wanted = False
174     elif domain.endswith(".tld"):
175         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
176         wanted = False
177     elif blacklist.is_blacklisted(domain):
178         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
179         wanted = False
180     elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
181         logger.debug("domain='%s' is a single user", domain)
182         wanted = False
183     elif domain.find("/tag/") > 0:
184         logger.debug("domain='%s' is a tag", domain)
185         wanted = False
186
187     logger.debug("wanted='%s' - EXIT!", wanted)
188     return wanted
189
190 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
191     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
192     domain_helper.raise_on(blocker)
193
194     if not isinstance(domain, str):
195         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
196     elif domain == "":
197         raise ValueError("Parameter domain is empty")
198     elif not isinstance(domain_hash, str) and domain_hash is not None:
199         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
200
201     if domain.find("*") >= 0:
202         logger.debug("blocker='%s' uses obfuscated domains", blocker)
203
204         # Obscured domain name with no hash
205         row = instances.deobfuscate("*", domain, domain_hash)
206
207         logger.debug("row[]='%s'", type(row))
208         if row is not None:
209             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
210             domain = row["domain"]
211         else:
212             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
213             instances.set_has_obfuscation(blocker, True)
214     elif domain.find("?") >= 0:
215         logger.debug("blocker='%s' uses obfuscated domains", blocker)
216
217         # Obscured domain name with no hash
218         row = instances.deobfuscate("?", domain, domain_hash)
219
220         logger.debug("row[]='%s'", type(row))
221         if row is not None:
222             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
223             domain = row["domain"]
224         else:
225             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
226             instances.set_has_obfuscation(blocker, True)
227     else:
228         logger.debug("domain='%s' is not obfuscated", domain)
229
230     logger.debug("domain='%s' - EXIT!", domain)
231     return domain
232
233 def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
234     logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
235     domain_helper.raise_on(blocker)
236     domain_helper.raise_on(blocked)
237
238     added = False
239     if not isinstance(reason, str) and reason is not None:
240         raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
241     elif not isinstance(block_level, str):
242         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
243     elif block_level == "":
244         raise ValueError("Parameter block_level is empty")
245
246     if not blocks.is_instance_blocked(blocker, blocked, block_level):
247         logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
248         blocks.add_instance(blocker, blocked, reason, block_level)
249         added = True
250     else:
251         logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
252         blocks.update_last_seen(blocker, blocked, block_level)
253
254     logger.debug("added='%s' - EXIT!", added)
255     return added
256
257 def alias_block_level(block_level: str) -> str:
258     logger.debug("block_level='%s' - CALLED!", block_level)
259     if not isinstance(block_level, str):
260         raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
261     elif block_level == "":
262         raise ValueError("Parameter 'block_level' is empty")
263
264     if block_level == "silence":
265         logger.debug("Block level 'silence' has been changed to 'silenced'")
266         block_level = "silenced"
267     elif block_level == "suspend":
268         logger.debug("Block level 'suspend' has been changed to 'suspended'")
269         block_level = "suspended"
270     elif block_level == "nsfw":
271         logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'")
272         block_level = "media_nsfw"
273
274     logger.debug("block_level='%s' - EXIT!", block_level)
275     return block_level