]> git.mxchange.org Git - fba.git/blob - fba/utils.py
1a1dffec0ab69fbda7458a137e3f85179e04060e
[fba.git] / fba / utils.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import cookies
27 from fba.helpers import domain as domain_helper
28 from fba.helpers import tidyup
29
30 from fba.http import federation
31 from fba.http import network
32
33 from fba.models import instances
34
35 logging.basicConfig(level=logging.INFO)
36 logger = logging.getLogger(__name__)
37
38 ##### Other functions #####
39
40 def is_primitive(var: any) -> bool:
41     logger.debug(f"var[]='{type(var)}' - CALLED!")
42     return type(var) in {int, str, float, bool} or var is None
43
44 def get_hash(domain: str) -> str:
45     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
46     domain_helper.raise_on(domain)
47
48     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49
50 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
51     logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not isinstance(headers, dict):
57         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
58     elif not isinstance(timeout, tuple):
59         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
60
61     logger.debug(f"Parsing url='{url}'")
62     components = urlparse(url)
63
64     # Invoke other function, avoid trailing ?
65     logger.debug(f"components[{type(components)}]={components}")
66     if components.query != "":
67         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
68     else:
69         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
70
71     logger.debug(f"response[]='{type(response)}' - EXIT!")
72     return response
73
74 def process_domain(domain: str, blocker: str, command: str) -> bool:
75     logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
76     domain_helper.raise_on(domain)
77     domain_helper.raise_on(blocker)
78
79     if not isinstance(command, str):
80         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
81     elif command == "":
82         raise ValueError("Parameter 'command' is empty")
83
84     if domain.find("*") > 0:
85         logger.debug("blocker='%s' uses obfucated domains, marking ...", blocker)
86         instances.set_has_obfucation(blocker, True)
87
88         # Try to de-obscure it
89         row = instances.deobfucate("*", domain)
90
91         logger.debug(f"row[{type(row)}]='{row}'")
92         if row is None:
93             logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
94             return False
95
96         logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
97         domain = row[0]
98     elif domain.find("?") > 0:
99         logger.debug("blocker='%s' uses obfucated domains, marking ...", blocker)
100         instances.set_has_obfucation(blocker, True)
101
102         # Try to de-obscure it
103         row = instances.deobfucate("?", domain)
104
105         logger.debug(f"row[{type(row)}]='{row}'")
106         if row is None:
107             logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
108             return False
109
110         logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
111         domain = row[0]
112     else:
113         logger.debug("blocker='%s' has NO obfucation on their block list", blocker)
114         instances.set_has_obfucation(blocker, False)
115
116     if instances.has_pending(blocker):
117         logger.debug("Invoking instances.update_data(%s) ...", blocker)
118         instances.update_data(blocker)
119
120     if not is_domain_wanted(domain):
121         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
122         return False
123     elif instances.is_recent(domain):
124         logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
125         return False
126
127     processed = False
128     try:
129         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
130         federation.fetch_instances(domain, blocker, None, command)
131         processed = True
132
133         logger.debug("Invoking cookies.clear(%s) ...", domain)
134         cookies.clear(domain)
135     except network.exceptions as exception:
136         logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
137         instances.set_last_error(domain, exception)
138
139     logger.debug(f"processed='{processed}' - EXIT!")
140     return processed
141
142 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
143     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
144     if not isinstance(tags, bs4.element.ResultSet):
145         raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
146     elif not isinstance(search, str):
147         raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
148     elif search == "":
149         raise ValueError("Parameter 'search' is empty")
150
151     domains = list()
152     for tag in tags:
153         logger.debug("tag[]='%s'", type(tag))
154         domain = tidyup.domain(tag.find(search).contents[0])
155
156         logger.debug("domain='%s'", domain)
157         if domain == "":
158             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
159             domain = tidyup.domain(tag.find("em").contents[0])
160
161         if not is_domain_wanted(domain):
162             logger.debug("domain='%s' is not wanted - SKIPPED!")
163             continue
164
165         logger.debug("Appending domain='%s'", domain)
166         domains.append(domain)
167
168     logger.debug("domains()=%d - EXIT!", len(domains))
169     return domains
170
171 def is_domain_wanted(domain: str) -> bool:
172     logger.debug("domain='%s' - CALLED!", domain)
173
174     wanted = True
175     if not isinstance(domain, str):
176         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
177     elif domain == "":
178         raise ValueError("Parameter 'domain' is empty")
179     elif domain.lower() != domain:
180         wanted = False
181     elif not validators.domain(domain.split("/")[0]):
182         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
183         wanted = False
184     elif domain.endswith(".arpa"):
185         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
186         wanted = False
187     elif domain.endswith(".tld"):
188         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
189         wanted = False
190     elif blacklist.is_blacklisted(domain):
191         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
192         wanted = False
193
194     logger.debug("wanted='%s' - EXIT!", wanted)
195     return wanted