]> git.mxchange.org Git - fba.git/blob - fba/utils.py
46fa37efee7ca31f8f883390796b17b1ed24a19f
[fba.git] / fba / utils.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import cookies
27 from fba.helpers import tidyup
28
29 from fba.http import federation
30 from fba.http import network
31
32 from fba.models import instances
33
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
36
37 ##### Other functions #####
38
39 def is_primitive(var: any) -> bool:
40     logger.debug(f"var[]='{type(var)}' - CALLED!")
41     return type(var) in {int, str, float, bool} or var is None
42
43 def get_hash(domain: str) -> str:
44     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
45     if not isinstance(domain, str):
46         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
47     elif domain == "":
48         raise ValueError("Parameter 'domain' is empty")
49     elif domain.lower() != domain:
50         raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
51     elif not validators.domain(domain.split("/")[0]):
52         raise ValueError(f"domain='{domain}' is not a valid domain")
53     elif domain.endswith(".arpa"):
54         raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
55     elif domain.endswith(".tld"):
56         raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
57
58     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
59
60 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
61     logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
62     if not isinstance(url, str):
63         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
64     elif url == "":
65         raise ValueError("Parameter 'url' is empty")
66     elif not isinstance(headers, dict):
67         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
68     elif not isinstance(timeout, tuple):
69         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
70
71     logger.debug(f"Parsing url='{url}'")
72     components = urlparse(url)
73
74     # Invoke other function, avoid trailing ?
75     logger.debug(f"components[{type(components)}]={components}")
76     if components.query != "":
77         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
78     else:
79         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
80
81     logger.debug(f"response[]='{type(response)}' - EXXIT!")
82     return response
83
84 def process_domain(domain: str, blocker: str, command: str) -> bool:
85     logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
86     if not isinstance(domain, str):
87         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
88     elif domain == "":
89         raise ValueError("Parameter 'domain' is empty")
90     elif domain.lower() != domain:
91         raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
92     elif not validators.domain(domain.split("/")[0]):
93         raise ValueError(f"domain='{domain}' is not a valid domain")
94     elif domain.endswith(".arpa"):
95         raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
96     elif domain.endswith(".tld"):
97         raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
98     elif not isinstance(blocker, str):
99         raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
100     elif blocker == "":
101         raise ValueError("Parameter 'blocker' is empty")
102     elif not validators.domain(blocker.split("/")[0]):
103         raise ValueError(f"blocker='{blocker}' is not a valid domain")
104     elif blocker.endswith(".arpa"):
105         raise ValueError(f"blocker='{blocker}' is a domain for reversed IP addresses, please don't crawl them!")
106     elif blocker.endswith(".tld"):
107         raise ValueError(f"blocker='{blocker}' is a fake domain, please don't crawl them!")
108     elif not isinstance(command, str):
109         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
110     elif command == "":
111         raise ValueError("Parameter 'command' is empty")
112
113     if domain.find("*") > 0:
114         # Try to de-obscure it
115         row = instances.deobscure("*", domain)
116
117         logger.debug(f"row[{type(row)}]='{row}'")
118         if row is None:
119             logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
120             return False
121
122         logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
123         domain = row[0]
124     elif domain.find("?") > 0:
125         # Try to de-obscure it
126         row = instances.deobscure("?", domain)
127
128         logger.debug(f"row[{type(row)}]='{row}'")
129         if row is None:
130             logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
131             return False
132
133         logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
134         domain = row[0]
135
136     if not is_domain_wanted(domain)
137         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
138         return False
139     elif instances.is_recent(domain):
140         logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
141         return False
142
143     processed = False
144     try:
145         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
146         federation.fetch_instances(domain, blocker, None, command)
147         processed = True
148
149         logger.debug("Invoking cookies.clear(%s) ...", domain)
150         cookies.clear(domain)
151     except network.exceptions as exception:
152         logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
153         instances.set_last_error(domain, exception)
154
155     logger.debug(f"processed='{processed}' - EXIT!")
156     return processed
157
158 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
159     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
160     if not isinstance(tags, bs4.element.ResultSet):
161         raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
162     elif not isinstance(search, str):
163         raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
164     elif search == "":
165         raise ValueError("Parameter 'search' is empty")
166
167     domains = list()
168     for tag in tags:
169         logger.debug("tag[]='%s'", type(tag))
170         domain = tidyup.domain(tag.find(search).contents[0])
171
172         logger.debug("domain='%s'", domain)
173         if domain == "":
174             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
175             domain = tidyup.domain(tag.find("em").contents[0])
176
177         if not is_domain_wanted(domain):
178             logger.debug("domain='%s' is not wanted - SKIPPED!")
179             continue
180
181         logger.debug("Appending domain='%s'", domain)
182         domains.append(domain)
183
184     logger.debug("domains()=%d - EXIT!", len(domains))
185     return domains
186
187 def is_domain_wanted (domain: str) -> bool:
188     logger.debug("domain='%s' - CALLED!", domain)
189     wanted = True
190
191     if not isinstance(domain, str):
192         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
193     elif domain == "":
194         raise ValueError("Parameter 'domain' is empty")
195     elif domain.lower() != domain:
196         raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
197     elif not validators.domain(domain.split("/")[0]):
198         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
199         wanted = False
200     elif domain.endswith(".arpa"):
201         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
202         wanted = False
203     elif domain.endswith(".tld"):
204         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
205         wanted = False
206     elif blacklist.is_blacklisted(domain):
207         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
208         wanted = False
209
210     logger.debug("wanted='%s' - EXIT!", wanted)
211     return wanted