]> git.mxchange.org Git - fba.git/blob - fba/utils.py
577e9ab4635c317cd06064985bc308570b73db9f
[fba.git] / fba / utils.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import cookies
27 from fba.helpers import domain as domain_helper
28 from fba.helpers import tidyup
29
30 from fba.http import federation
31 from fba.http import network
32
33 from fba.models import instances
34
35 logging.basicConfig(level=logging.INFO)
36 logger = logging.getLogger(__name__)
37
38 ##### Other functions #####
39
40 def is_primitive(var: any) -> bool:
41     logger.debug("var[]='%s' - CALLED!", type(var))
42     return type(var) in {int, str, float, bool} or var is None
43
44 def get_hash(domain: str) -> str:
45     logger.debug("domain='%s' - CALLED!", domain)
46     domain_helper.raise_on(domain)
47
48     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49
50 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
51     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not isinstance(headers, dict):
57         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
58     elif not isinstance(timeout, tuple):
59         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
60
61     logger.debug("Parsing url='%s' ...", url)
62     components = urlparse(url)
63
64     # Invoke other function, avoid trailing ?
65     logger.debug("components[%s]='%s'", type(components), components)
66     if components.query != "":
67         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
68     else:
69         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
70
71     logger.debug("response[]='%s' - EXIT!", type(response))
72     return response
73
74 def process_domain(domain: str, blocker: str, command: str) -> bool:
75     logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
76     domain_helper.raise_on(domain)
77     domain_helper.raise_on(blocker)
78
79     if not isinstance(command, str):
80         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
81     elif command == "":
82         raise ValueError("Parameter 'command' is empty")
83
84     logger.debug("domain='%s' - BEFORE!")
85     if domain.find("*") > 0:
86         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
87         instances.set_has_obfuscation(blocker, True)
88
89         # Try to de-obscure it
90         row = instances.deobfuscate("*", domain)
91
92         logger.debug("row[%s]='%s'", type(row), row)
93         if row is None:
94             logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
95             return False
96
97         logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
98         domain = row[0]
99     elif domain.find("?") > 0:
100         logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
101         instances.set_has_obfuscation(blocker, True)
102
103         # Try to de-obscure it
104         row = instances.deobfuscate("?", domain)
105
106         logger.debug("row[%s]='%s'", type(row), row)
107         if row is None:
108             logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
109             return False
110
111         logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
112         domain = row[0]
113     else:
114         logger.debug("blocker='%s' has NO obfuscation on their block list", blocker)
115         instances.set_has_obfuscation(blocker, False)
116
117     logger.debug("domain='%s' - DEOBFUSCATED!", domain)
118     if instances.has_pending(blocker):
119         logger.debug("Invoking instances.update_data(%s) ...", blocker)
120         instances.update_data(blocker)
121
122     if not is_domain_wanted(domain):
123         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
124         return False
125     elif instances.is_recent(domain):
126         logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
127         return False
128
129     processed = False
130     try:
131         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
132         federation.fetch_instances(domain, blocker, None, command)
133         processed = True
134
135         logger.debug("Invoking cookies.clear(%s) ...", domain)
136         cookies.clear(domain)
137     except network.exceptions as exception:
138         logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
139         instances.set_last_error(domain, exception)
140
141     logger.debug("processed='%s' - EXIT!", processed)
142     return processed
143
144 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
145     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
146     if not isinstance(tags, bs4.element.ResultSet):
147         raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
148     elif not isinstance(search, str):
149         raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
150     elif search == "":
151         raise ValueError("Parameter 'search' is empty")
152
153     domains = list()
154     logger.debug("Parsing %d tags ...", len(tags))
155     for tag in tags:
156         logger.debug("tag[]='%s'", type(tag))
157         domain = tidyup.domain(tag.find(search).contents[0])
158
159         logger.debug("domain='%s'", domain)
160         if domain == "":
161             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
162             domain = tidyup.domain(tag.find("em").contents[0])
163
164         if not is_domain_wanted(domain):
165             logger.debug("domain='%s' is not wanted - SKIPPED!")
166             continue
167
168         logger.debug("Appending domain='%s'", domain)
169         domains.append(domain)
170
171     logger.debug("domains()=%d - EXIT!", len(domains))
172     return domains
173
174 def is_domain_wanted(domain: str) -> bool:
175     logger.debug("domain='%s' - CALLED!", domain)
176
177     wanted = True
178     if not isinstance(domain, str):
179         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
180     elif domain == "":
181         raise ValueError("Parameter 'domain' is empty")
182     elif domain.lower() != domain:
183         wanted = False
184     elif not validators.domain(domain.split("/")[0]):
185         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
186         wanted = False
187     elif domain.endswith(".arpa"):
188         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
189         wanted = False
190     elif domain.endswith(".tld"):
191         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
192         wanted = False
193     elif blacklist.is_blacklisted(domain):
194         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
195         wanted = False
196
197     logger.debug("wanted='%s' - EXIT!", wanted)
198     return wanted