]> git.mxchange.org Git - fba.git/blob - fba/utils.py
019938c4f6c89d14be0c3a91f29b33ec89b9a652
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
28
29 from fba.http import federation
30 from fba.http import network
31
32 from fba.models import blocks
33 from fba.models import instances
34
35 logging.basicConfig(level=logging.INFO)
36 logger = logging.getLogger(__name__)
37
38 ##### Other functions #####
39
40 def is_primitive(var: any) -> bool:
41     logger.debug("var[]='%s' - CALLED!", type(var))
42     return type(var) in {int, str, float, bool, None} or var is None
43
44 def get_hash(domain: str) -> str:
45     logger.debug("domain='%s' - CALLED!", domain)
46     domain_helper.raise_on(domain)
47
48     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49
50 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
51     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not isinstance(headers, dict):
57         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
58     elif not isinstance(timeout, tuple):
59         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
60
61     logger.debug("Parsing url='%s' ...", url)
62     components = urlparse(url)
63
64     # Invoke other function, avoid trailing ?
65     logger.debug("components[%s]='%s'", type(components), components)
66     if components.query != "":
67         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
68     else:
69         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
70
71     logger.debug("response[]='%s' - EXIT!", type(response))
72     return response
73
74 def process_domain(domain: str, blocker: str, command: str) -> bool:
75     logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
76     domain_helper.raise_on(domain)
77     domain_helper.raise_on(blocker)
78
79     if not isinstance(command, str):
80         raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
81     elif command == "":
82         raise ValueError("Parameter 'command' is empty")
83
84     logger.debug("domain='%s' - BEFORE!", domain)
85     domain = deobfuscate(domain, blocker)
86
87     logger.debug("domain='%s' - DEOBFUSCATED!", domain)
88     if instances.has_pending(blocker):
89         logger.debug("Flushing updates for blocker='%s' ...", blocker)
90         instances.update_data(blocker)
91
92     if not is_domain_wanted(domain):
93         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
94         return False
95     elif instances.is_recent(domain):
96         logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
97         return False
98
99     processed = False
100     try:
101         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
102         federation.fetch_instances(domain, blocker, None, command)
103         processed = True
104     except network.exceptions as exception:
105         logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
106         instances.set_last_error(domain, exception)
107
108     logger.debug("Checking if domain='%s' has pending updates ...", domain)
109     if instances.has_pending(domain):
110         logger.debug("Flushing updates for domain='%s' ...", domain)
111         instances.update_data(domain)
112
113     logger.debug("processed='%s' - EXIT!", processed)
114     return processed
115
116 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
117     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
118     if not isinstance(tags, bs4.element.ResultSet):
119         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
120     elif not isinstance(search, str):
121         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
122     elif search == "":
123         raise ValueError("Parameter 'search' is empty")
124
125     domains = list()
126     logger.debug("Parsing %d tags ...", len(tags))
127     for tag in tags:
128         logger.debug("tag[]='%s'", type(tag))
129         domain = tidyup.domain(tag.find(search).contents[0])
130         logger.debug("domain='%s' - AFTER!", domain)
131
132         if domain == "":
133             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
134             domain = tidyup.domain(tag.find("em").contents[0])
135             logger.debug("domain='%s' - AFTER!", domain)
136
137         if domain == "":
138             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
139             continue
140
141         logger.debug("domain='%s' - BEFORE!", domain)
142         domain = domain.encode("idna").decode("utf-8")
143         logger.debug("domain='%s' - AFTER!", domain)
144
145         if not is_domain_wanted(domain):
146             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
147             continue
148
149         logger.debug("Appending domain='%s'", domain)
150         domains.append(domain)
151
152     logger.debug("domains()=%d - EXIT!", len(domains))
153     return domains
154
155 def is_domain_wanted(domain: str) -> bool:
156     logger.debug("domain='%s' - CALLED!", domain)
157
158     wanted = True
159     if not isinstance(domain, str):
160         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
161     elif domain == "":
162         raise ValueError("Parameter 'domain' is empty")
163     elif domain.lower() != domain:
164         wanted = False
165     elif not validators.domain(domain.split("/")[0]):
166         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
167         wanted = False
168     elif domain.endswith(".arpa"):
169         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
170         wanted = False
171     elif domain.endswith(".onion"):
172         logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain)
173         wanted = False
174     elif domain.endswith(".tld"):
175         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
176         wanted = False
177     elif "xn--" in domain:
178         raise ValueError(f"domain='{domain}' is a punycode domain, please don't crawl them!")
179     elif blacklist.is_blacklisted(domain):
180         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
181         wanted = False
182     elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
183         logger.debug("domain='%s' is a single user", domain)
184         wanted = False
185     elif domain.find("/tag/") > 0:
186         logger.debug("domain='%s' is a tag", domain)
187         wanted = False
188
189     logger.debug("wanted='%s' - EXIT!", wanted)
190     return wanted
191
192 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
193     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
194     domain_helper.raise_on(blocker)
195
196     if not isinstance(domain, str):
197         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
198     elif domain == "":
199         raise ValueError("Parameter domain is empty")
200     elif not isinstance(domain_hash, str) and domain_hash is not None:
201         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
202
203     if domain.find("*") >= 0:
204         logger.debug("blocker='%s' uses obfuscated domains", blocker)
205
206         # Obscured domain name with no hash
207         row = instances.deobfuscate("*", domain, domain_hash)
208
209         logger.debug("row[]='%s'", type(row))
210         if row is not None:
211             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
212             domain = row["domain"]
213         else:
214             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
215             instances.set_has_obfuscation(blocker, True)
216     elif domain.find("?") >= 0:
217         logger.debug("blocker='%s' uses obfuscated domains", blocker)
218
219         # Obscured domain name with no hash
220         row = instances.deobfuscate("?", domain, domain_hash)
221
222         logger.debug("row[]='%s'", type(row))
223         if row is not None:
224             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
225             domain = row["domain"]
226         else:
227             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
228             instances.set_has_obfuscation(blocker, True)
229     else:
230         logger.debug("domain='%s' is not obfuscated", domain)
231
232     logger.debug("domain='%s' - EXIT!", domain)
233     return domain
234
235 def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
236     logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
237     domain_helper.raise_on(blocker)
238     domain_helper.raise_on(blocked)
239
240     added = False
241     if not isinstance(reason, str) and reason is not None:
242         raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
243     elif not isinstance(block_level, str):
244         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
245     elif block_level == "":
246         raise ValueError("Parameter block_level is empty")
247
248     if not blocks.is_instance_blocked(blocker, blocked, block_level):
249         logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
250         blocks.add_instance(blocker, blocked, reason, block_level)
251         added = True
252     else:
253         logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
254         blocks.update_last_seen(blocker, blocked, block_level)
255
256     logger.debug("added='%s' - EXIT!", added)
257     return added
258
259 def alias_block_level(block_level: str) -> str:
260     logger.debug("block_level='%s' - CALLED!", block_level)
261     if not isinstance(block_level, str):
262         raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
263     elif block_level == "":
264         raise ValueError("Parameter 'block_level' is empty")
265
266     if block_level == "silence":
267         logger.debug("Block level 'silence' has been changed to 'silenced'")
268         block_level = "silenced"
269     elif block_level == "suspend":
270         logger.debug("Block level 'suspend' has been changed to 'suspended'")
271         block_level = "suspended"
272     elif block_level == "nsfw":
273         logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'")
274         block_level = "media_nsfw"
275
276     logger.debug("block_level='%s' - EXIT!", block_level)
277     return block_level