]> git.mxchange.org Git - fba.git/blob - fba/utils.py
fcc12424bd5b063c9f8ab88ad6f7cc6112084ead
[fba.git] / fba / utils.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import cookies
27 from fba.helpers import domain as domain_helper
28 from fba.helpers import tidyup
29
30 from fba.http import federation
31 from fba.http import network
32
33 from fba.models import instances
34
35 logging.basicConfig(level=logging.INFO)
36 logger = logging.getLogger(__name__)
37
38 ##### Other functions #####
39
40 def is_primitive(var: any) -> bool:
41     logger.debug(f"var[]='{type(var)}' - CALLED!")
42     return type(var) in {int, str, float, bool} or var is None
43
44 def get_hash(domain: str) -> str:
45     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
46     domain_helper.raise_on(domain)
47
48     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49
50 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
51     logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not isinstance(headers, dict):
57         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
58     elif not isinstance(timeout, tuple):
59         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
60
61     logger.debug(f"Parsing url='{url}'")
62     components = urlparse(url)
63
64     # Invoke other function, avoid trailing ?
65     logger.debug(f"components[{type(components)}]={components}")
66     if components.query != "":
67         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
68     else:
69         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
70
71     logger.debug(f"response[]='{type(response)}' - EXXIT!")
72     return response
73
74 def process_domain(domain: str, blocker: str, command: str) -> bool:
75     logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
76     domain_helper.raise_on(domain)
77     domain_helper.raise_on(blocker)
78     if not isinstance(command, str):
79         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
80     elif command == "":
81         raise ValueError("Parameter 'command' is empty")
82
83     if domain.find("*") > 0:
84         # Try to de-obscure it
85         row = instances.deobscure("*", domain)
86
87         logger.debug(f"row[{type(row)}]='{row}'")
88         if row is None:
89             logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
90             return False
91
92         logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
93         domain = row[0]
94     elif domain.find("?") > 0:
95         # Try to de-obscure it
96         row = instances.deobscure("?", domain)
97
98         logger.debug(f"row[{type(row)}]='{row}'")
99         if row is None:
100             logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
101             return False
102
103         logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
104         domain = row[0]
105
106     if not is_domain_wanted(domain):
107         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
108         return False
109     elif instances.is_recent(domain):
110         logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
111         return False
112
113     processed = False
114     try:
115         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
116         federation.fetch_instances(domain, blocker, None, command)
117         processed = True
118
119         logger.debug("Invoking cookies.clear(%s) ...", domain)
120         cookies.clear(domain)
121     except network.exceptions as exception:
122         logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
123         instances.set_last_error(domain, exception)
124
125     logger.debug(f"processed='{processed}' - EXIT!")
126     return processed
127
128 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
129     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
130     if not isinstance(tags, bs4.element.ResultSet):
131         raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
132     elif not isinstance(search, str):
133         raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
134     elif search == "":
135         raise ValueError("Parameter 'search' is empty")
136
137     domains = list()
138     for tag in tags:
139         logger.debug("tag[]='%s'", type(tag))
140         domain = tidyup.domain(tag.find(search).contents[0])
141
142         logger.debug("domain='%s'", domain)
143         if domain == "":
144             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
145             domain = tidyup.domain(tag.find("em").contents[0])
146
147         if not is_domain_wanted(domain):
148             logger.debug("domain='%s' is not wanted - SKIPPED!")
149             continue
150
151         logger.debug("Appending domain='%s'", domain)
152         domains.append(domain)
153
154     logger.debug("domains()=%d - EXIT!", len(domains))
155     return domains
156
157 def is_domain_wanted (domain: str) -> bool:
158     logger.debug("domain='%s' - CALLED!", domain)
159
160     wanted = True
161     if not isinstance(domain, str):
162         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
163     elif domain == "":
164         raise ValueError("Parameter 'domain' is empty")
165     elif domain.lower() != domain:
166         wanted = False
167     elif not validators.domain(domain.split("/")[0]):
168         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
169         wanted = False
170     elif domain.endswith(".arpa"):
171         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
172         wanted = False
173     elif domain.endswith(".tld"):
174         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
175         wanted = False
176     elif blacklist.is_blacklisted(domain):
177         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
178         wanted = False
179
180     logger.debug("wanted='%s' - EXIT!", wanted)
181     return wanted