]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
28
29 from fba.http import federation
30 from fba.http import network
31
32 from fba.models import instances
33
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
36
37 ##### Other functions #####
38
39 def is_primitive(var: any) -> bool:
40     logger.debug("var[]='%s' - CALLED!", type(var))
41     return type(var) in {int, str, float, bool} or var is None
42
43 def get_hash(domain: str) -> str:
44     logger.debug("domain='%s' - CALLED!", domain)
45     domain_helper.raise_on(domain)
46
47     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
48
49 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
50     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
51     if not isinstance(url, str):
52         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
53     elif url == "":
54         raise ValueError("Parameter 'url' is empty")
55     elif not isinstance(headers, dict):
56         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
57     elif not isinstance(timeout, tuple):
58         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
59
60     logger.debug("Parsing url='%s' ...", url)
61     components = urlparse(url)
62
63     # Invoke other function, avoid trailing ?
64     logger.debug("components[%s]='%s'", type(components), components)
65     if components.query != "":
66         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
67     else:
68         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
69
70     logger.debug("response[]='%s' - EXIT!", type(response))
71     return response
72
73 def process_domain(domain: str, blocker: str, command: str) -> bool:
74     logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
75     domain_helper.raise_on(domain)
76     domain_helper.raise_on(blocker)
77
78     if not isinstance(command, str):
79         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
80     elif command == "":
81         raise ValueError("Parameter 'command' is empty")
82
83     logger.debug("domain='%s' - BEFORE!")
84     if domain.find("*") > 0:
85         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
86         instances.set_has_obfuscation(blocker, True)
87
88         # Try to de-obscure it
89         row = instances.deobfuscate("*", domain)
90
91         logger.debug("row[%s]='%s'", type(row), row)
92         if row is None:
93             logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
94             return False
95
96         logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
97         domain = row[0]
98     elif domain.find("?") > 0:
99         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
100         instances.set_has_obfuscation(blocker, True)
101
102         # Try to de-obscure it
103         row = instances.deobfuscate("?", domain)
104
105         logger.debug("row[%s]='%s'", type(row), row)
106         if row is None:
107             logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
108             return False
109
110         logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
111         domain = row[0]
112     else:
113         logger.debug("blocker='%s' has NO obfuscation on their block list", blocker)
114         instances.set_has_obfuscation(blocker, False)
115
116     logger.debug("domain='%s' - DEOBFUSCATED!", domain)
117     if instances.has_pending(blocker):
118         logger.debug("Flushing updates for blocker='%s' ...", blocker)
119         instances.update_data(blocker)
120
121     if not is_domain_wanted(domain):
122         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
123         return False
124     elif instances.is_recent(domain):
125         logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
126         return False
127
128     processed = False
129     try:
130         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
131         federation.fetch_instances(domain, blocker, None, command)
132         processed = True
133     except network.exceptions as exception:
134         logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
135         instances.set_last_error(domain, exception)
136
137     logger.debug("Checking if domain='%s' has pending updates ...")
138     if instances.has_pending(domain):
139         logger.debug("Flushing updates for domain='%s' ...")
140         instances.update_data(domain)
141
142     logger.debug("processed='%s' - EXIT!", processed)
143     return processed
144
145 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
146     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
147     if not isinstance(tags, bs4.element.ResultSet):
148         raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
149     elif not isinstance(search, str):
150         raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
151     elif search == "":
152         raise ValueError("Parameter 'search' is empty")
153
154     domains = list()
155     logger.debug("Parsing %d tags ...", len(tags))
156     for tag in tags:
157         logger.debug("tag[]='%s'", type(tag))
158         domain = tidyup.domain(tag.find(search).contents[0])
159
160         logger.debug("domain='%s'", domain)
161         if domain == "":
162             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
163             domain = tidyup.domain(tag.find("em").contents[0])
164
165         if not is_domain_wanted(domain):
166             logger.debug("domain='%s' is not wanted - SKIPPED!")
167             continue
168
169         logger.debug("Appending domain='%s'", domain)
170         domains.append(domain)
171
172     logger.debug("domains()=%d - EXIT!", len(domains))
173     return domains
174
175 def is_domain_wanted(domain: str) -> bool:
176     logger.debug("domain='%s' - CALLED!", domain)
177
178     wanted = True
179     if not isinstance(domain, str):
180         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
181     elif domain == "":
182         raise ValueError("Parameter 'domain' is empty")
183     elif domain.lower() != domain:
184         wanted = False
185     elif not validators.domain(domain.split("/")[0]):
186         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
187         wanted = False
188     elif domain.endswith(".arpa"):
189         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
190         wanted = False
191     elif domain.endswith(".tld"):
192         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
193         wanted = False
194     elif blacklist.is_blacklisted(domain):
195         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
196         wanted = False
197     elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
198         logger.debug("domain='%s' is a single user", domain)
199         wanted = False
200     elif domain.find("/tag/") > 0:
201         logger.debug("domain='%s' is a tag", domain)
202         wanted = False
203
204     logger.debug("wanted='%s' - EXIT!", wanted)
205     return wanted
206
207 def deobfuscate_domain(domain: str, blocker: str) -> str:
208     logger.debug("domain='%s',blocker='%s' - CALLED!", domain, blocker)
209     domain_helper.raise_on(domain)
210     domain_helper.raise_on(blocker)
211
212     if domain.count("*") > 0:
213         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
214         instances.set_has_obfuscation(blocker, True)
215
216         # Obscured domain name with no hash
217         row = instances.deobfuscate("*", domain)
218
219         logger.debug("row[]='%s'", type(row))
220         if row is not None:
221             logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
222             domain = row[0]
223     elif domain.count("?") > 0:
224         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
225         instances.set_has_obfuscation(blocker, True)
226
227         # Obscured domain name with no hash
228         row = instances.deobfuscate("?", domain)
229
230         logger.debug("row[]='%s'", type(row))
231         if row is not None:
232             logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
233             domain = row[0]
234     else:
235         logger.debug("domain='%s' is not obfuscated", domain)
236
237     logger.debug("domain='%s' - EXIT!", domain)
238     return domain