]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
28
29 from fba.http import federation
30 from fba.http import network
31
32 from fba.models import instances
33
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
36
37 ##### Other functions #####
38
39 def is_primitive(var: any) -> bool:
40     logger.debug("var[]='%s' - CALLED!", type(var))
41     return type(var) in {int, str, float, bool} or var is None
42
43 def get_hash(domain: str) -> str:
44     logger.debug("domain='%s' - CALLED!", domain)
45     domain_helper.raise_on(domain)
46
47     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
48
49 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
50     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
51     if not isinstance(url, str):
52         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
53     elif url == "":
54         raise ValueError("Parameter 'url' is empty")
55     elif not isinstance(headers, dict):
56         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
57     elif not isinstance(timeout, tuple):
58         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
59
60     logger.debug("Parsing url='%s' ...", url)
61     components = urlparse(url)
62
63     # Invoke other function, avoid trailing ?
64     logger.debug("components[%s]='%s'", type(components), components)
65     if components.query != "":
66         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
67     else:
68         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
69
70     logger.debug("response[]='%s' - EXIT!", type(response))
71     return response
72
73 def process_domain(domain: str, blocker: str, command: str) -> bool:
74     logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
75     domain_helper.raise_on(domain)
76     domain_helper.raise_on(blocker)
77
78     if not isinstance(command, str):
79         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
80     elif command == "":
81         raise ValueError("Parameter 'command' is empty")
82
83     logger.debug("domain='%s' - BEFORE!")
84     if domain.find("*") > 0:
85         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
86         instances.set_has_obfuscation(blocker, True)
87
88         # Try to de-obscure it
89         row = instances.deobfuscate("*", domain)
90
91         logger.debug("row[%s]='%s'", type(row), row)
92         if row is None:
93             logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
94             return False
95
96         logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
97         domain = row[0]
98     elif domain.find("?") > 0:
99         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
100         instances.set_has_obfuscation(blocker, True)
101
102         # Try to de-obscure it
103         row = instances.deobfuscate("?", domain)
104
105         logger.debug("row[%s]='%s'", type(row), row)
106         if row is None:
107             logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
108             return False
109
110         logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
111         domain = row[0]
112     else:
113         logger.debug("blocker='%s' has NO obfuscation on their block list", blocker)
114         instances.set_has_obfuscation(blocker, False)
115
116     logger.debug("domain='%s' - DEOBFUSCATED!", domain)
117     if instances.has_pending(blocker):
118         logger.debug("Flushing updates for blocker='%s' ...", blocker)
119         instances.update_data(blocker)
120
121     if not is_domain_wanted(domain):
122         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
123         return False
124     elif instances.is_recent(domain):
125         logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
126         return False
127
128     processed = False
129     try:
130         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
131         federation.fetch_instances(domain, blocker, None, command)
132         processed = True
133     except network.exceptions as exception:
134         logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
135         instances.set_last_error(domain, exception)
136
137     logger.debug("Checking if domain='%s' has pending updates ...")
138     if instances.has_pending(domain):
139         logger.debug("Flushing updates for domain='%s' ...")
140         instances.update_data(domain)
141
142     logger.debug("processed='%s' - EXIT!", processed)
143     return processed
144
145 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
146     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
147     if not isinstance(tags, bs4.element.ResultSet):
148         raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
149     elif not isinstance(search, str):
150         raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
151     elif search == "":
152         raise ValueError("Parameter 'search' is empty")
153
154     domains = list()
155     logger.debug("Parsing %d tags ...", len(tags))
156     for tag in tags:
157         logger.debug("tag[]='%s'", type(tag))
158         domain = tidyup.domain(tag.find(search).contents[0])
159
160         logger.debug("domain='%s'", domain)
161         if domain == "":
162             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
163             domain = tidyup.domain(tag.find("em").contents[0])
164
165         if not is_domain_wanted(domain):
166             logger.debug("domain='%s' is not wanted - SKIPPED!")
167             continue
168
169         logger.debug("Appending domain='%s'", domain)
170         domains.append(domain)
171
172     logger.debug("domains()=%d - EXIT!", len(domains))
173     return domains
174
175 def is_domain_wanted(domain: str) -> bool:
176     logger.debug("domain='%s' - CALLED!", domain)
177
178     wanted = True
179     if not isinstance(domain, str):
180         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
181     elif domain == "":
182         raise ValueError("Parameter 'domain' is empty")
183     elif domain.lower() != domain:
184         wanted = False
185     elif not validators.domain(domain.split("/")[0]):
186         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
187         wanted = False
188     elif domain.endswith(".arpa"):
189         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
190         wanted = False
191     elif domain.endswith(".tld"):
192         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
193         wanted = False
194     elif blacklist.is_blacklisted(domain):
195         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
196         wanted = False
197
198     logger.debug("wanted='%s' - EXIT!", wanted)
199     return wanted
200
201 def deobfuscate_domain(domain: str, blocker: str) -> str:
202     logger.debug("domain='%s',blocker='%s' - CALLED!", domain, blocker)
203     domain_helper.raise_on(domain)
204     domain_helper.raise_on(blocker)
205
206     if domain.count("*") > 0:
207         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
208         instances.set_has_obfuscation(blocker, True)
209
210         # Obscured domain name with no hash
211         row = instances.deobfuscate("*", domain)
212
213         logger.debug("row[]='%s'", type(row))
214         if row is not None:
215             logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
216             domain = row[0]
217     elif domain.count("?") > 0:
218         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
219         instances.set_has_obfuscation(blocker, True)
220
221         # Obscured domain name with no hash
222         row = instances.deobfuscate("?", domain)
223
224         logger.debug("row[]='%s'", type(row))
225         if row is not None:
226             logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
227             domain = row[0]
228     else:
229         logger.debug("domain='%s' is not obfuscated", domain)
230
231     logger.debug("domain='%s' - EXIT!", domain)
232     return domain