]> git.mxchange.org Git - fba.git/blob - fba/utils.py
79fb52f1c480e5730332347a66114af7aa8b0877
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
28
29 from fba.http import federation
30 from fba.http import network
31
32 from fba.models import blocks
33 from fba.models import instances
34
35 logging.basicConfig(level=logging.INFO)
36 logger = logging.getLogger(__name__)
37
38 ##### Other functions #####
39
40 def is_primitive(var: any) -> bool:
41     logger.debug("var[]='%s' - CALLED!", type(var))
42     return type(var) in {int, str, float, bool} or var is None
43
44 def get_hash(domain: str) -> str:
45     logger.debug("domain='%s' - CALLED!", domain)
46     domain_helper.raise_on(domain)
47
48     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49
50 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
51     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not isinstance(headers, dict):
57         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
58     elif not isinstance(timeout, tuple):
59         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
60
61     logger.debug("Parsing url='%s' ...", url)
62     components = urlparse(url)
63
64     # Invoke other function, avoid trailing ?
65     logger.debug("components[%s]='%s'", type(components), components)
66     if components.query != "":
67         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
68     else:
69         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
70
71     logger.debug("response[]='%s' - EXIT!", type(response))
72     return response
73
74 def process_domain(domain: str, blocker: str, command: str) -> bool:
75     logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
76     domain_helper.raise_on(domain)
77     domain_helper.raise_on(blocker)
78
79     if not isinstance(command, str):
80         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
81     elif command == "":
82         raise ValueError("Parameter 'command' is empty")
83
84     logger.debug("domain='%s' - BEFORE!", domain)
85     domain = deobfuscate_domain(domain, blocker)
86
87     logger.debug("domain='%s' - DEOBFUSCATED!", domain)
88     if instances.has_pending(blocker):
89         logger.debug("Flushing updates for blocker='%s' ...", blocker)
90         instances.update_data(blocker)
91
92     if not is_domain_wanted(domain):
93         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
94         return False
95     elif instances.is_recent(domain):
96         logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
97         return False
98
99     processed = False
100     try:
101         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
102         federation.fetch_instances(domain, blocker, None, command)
103         processed = True
104     except network.exceptions as exception:
105         logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
106         instances.set_last_error(domain, exception)
107
108     logger.debug("Checking if domain='%s' has pending updates ...", domain)
109     if instances.has_pending(domain):
110         logger.debug("Flushing updates for domain='%s' ...", domain)
111         instances.update_data(domain)
112
113     logger.debug("processed='%s' - EXIT!", processed)
114     return processed
115
116 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
117     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
118     if not isinstance(tags, bs4.element.ResultSet):
119         raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
120     elif not isinstance(search, str):
121         raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
122     elif search == "":
123         raise ValueError("Parameter 'search' is empty")
124
125     domains = list()
126     logger.debug("Parsing %d tags ...", len(tags))
127     for tag in tags:
128         logger.debug("tag[]='%s'", type(tag))
129         domain = tidyup.domain(tag.find(search).contents[0])
130
131         logger.debug("domain='%s'", domain)
132         if domain == "":
133             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
134             domain = tidyup.domain(tag.find("em").contents[0])
135
136         if not is_domain_wanted(domain):
137             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
138             continue
139
140         logger.debug("Appending domain='%s'", domain)
141         domains.append(domain)
142
143     logger.debug("domains()=%d - EXIT!", len(domains))
144     return domains
145
146 def is_domain_wanted(domain: str) -> bool:
147     logger.debug("domain='%s' - CALLED!", domain)
148
149     wanted = True
150     if not isinstance(domain, str):
151         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
152     elif domain == "":
153         raise ValueError("Parameter 'domain' is empty")
154     elif domain.lower() != domain:
155         wanted = False
156     elif not validators.domain(domain.split("/")[0]):
157         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
158         wanted = False
159     elif domain.endswith(".arpa"):
160         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
161         wanted = False
162     elif domain.endswith(".onion"):
163         logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain)
164         wanted = False
165     elif domain.endswith(".tld"):
166         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
167         wanted = False
168     elif blacklist.is_blacklisted(domain):
169         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
170         wanted = False
171     elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
172         logger.debug("domain='%s' is a single user", domain)
173         wanted = False
174     elif domain.find("/tag/") > 0:
175         logger.debug("domain='%s' is a tag", domain)
176         wanted = False
177
178     logger.debug("wanted='%s' - EXIT!", wanted)
179     return wanted
180
181 def deobfuscate_domain(domain: str, blocker: str, domain_hash: str = None) -> str:
182     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
183     domain_helper.raise_on(blocker)
184
185     if not isinstance(domain, str):
186         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
187     elif domain == "":
188         raise ValueError("Parameter domain is empty")
189     elif not isinstance(domain_hash, str) and domain_hash is not None:
190         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
191
192     if domain.find("*") >= 0:
193         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
194         instances.set_has_obfuscation(blocker, True)
195
196         # Obscured domain name with no hash
197         row = instances.deobfuscate("*", domain, domain_hash)
198
199         logger.debug("row[]='%s'", type(row))
200         if row is not None:
201             logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
202             domain = row[0]
203     elif domain.find("?") >= 0:
204         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
205         instances.set_has_obfuscation(blocker, True)
206
207         # Obscured domain name with no hash
208         row = instances.deobfuscate("?", domain, domain_hash)
209
210         logger.debug("row[]='%s'", type(row))
211         if row is not None:
212             logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
213             domain = row[0]
214     else:
215         logger.debug("domain='%s' is not obfuscated", domain)
216
217     logger.debug("domain='%s' - EXIT!", domain)
218     return domain
219
220 def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
221     logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
222     domain_helper.raise_on(blocker)
223     domain_helper.raise_on(blocked)
224     procesed = False
225
226     if not isinstance(reason, str) and reason is not None:
227         raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
228     elif not isinstance(block_level, str):
229         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
230     elif block_level == "":
231         raise ValueError("Parameter block_level is empty")
232
233     if not blocks.is_instance_blocked(blocker, blocked, block_level):
234         logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
235         blocks.add_instance(blocker, blocked, reason, block_level)
236         added = True
237     else:
238         logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
239         blocks.update_last_seen(blocker, blocked, block_level)
240
241     logger.debug("added='%s' - EXIT!", added)
242     return procesed
243