]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import config
27 from fba.helpers import domain as domain_helper
28 from fba.helpers import tidyup
29
30 from fba.http import network
31
32 from fba.models import instances
33
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
36
37 ##### Other functions #####
38
39 def is_primitive(var: any) -> bool:
40     logger.debug("var[]='%s' - CALLED!", type(var))
41     return type(var) in {int, str, float, bool, None} or var is None
42
43 def get_hash(domain: str) -> str:
44     logger.debug("domain='%s' - CALLED!", domain)
45     domain_helper.raise_on(domain)
46
47     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
48
49 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
50     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
51
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not validators.url(url):
57         raise ValueError(f"Parameter url='{url}' is not a valid URL")
58     elif not isinstance(headers, dict):
59         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
60     elif not isinstance(timeout, tuple):
61         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
62
63     logger.debug("Parsing url='%s' ...", url)
64     components = urlparse(url)
65
66     # Invoke other function, avoid trailing ?
67     logger.debug("components[%s]='%s'", type(components), components)
68     if components.query != "":
69         logger.debug("Fetching path='%s?%s' from netloc='%s' ...", components.path, components.query, components.netloc)
70         response = network.fetch_response(
71             components.netloc.split(":")[0],
72             f"{components.path}?{components.query}",
73             headers,
74             timeout
75         )
76     else:
77         logger.debug("Fetching path='%s' from netloc='%s' ...", components.path, components.netloc)
78         response = network.fetch_response(
79             components.netloc.split(":")[0],
80             components.path if isinstance(components.path, str) and components.path != '' else '/',
81             headers,
82             timeout
83         )
84
85     logger.debug("response[]='%s' - EXIT!", type(response))
86     return response
87
88 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
89     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
90
91     if not isinstance(tags, bs4.element.ResultSet):
92         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
93     elif not isinstance(search, str):
94         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
95     elif search == "":
96         raise ValueError("Parameter 'search' is empty")
97
98     domains = list()
99     logger.debug("Parsing %d tags ...", len(tags))
100     for tag in tags:
101         logger.debug("tag[]='%s'", type(tag))
102         domain = tidyup.domain(tag.find(search).contents[0])
103         logger.debug("domain='%s' - AFTER!", domain)
104
105         if domain == "":
106             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
107             domain = tidyup.domain(tag.find("em").contents[0])
108             logger.debug("domain='%s' - AFTER!", domain)
109
110         logger.debug("domain='%s' - AFTER2!", domain)
111         if domain == "":
112             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
113             continue
114
115         logger.debug("domain='%s' - BEFORE!", domain)
116         domain = domain.encode("idna").decode("utf-8")
117         logger.debug("domain='%s' - AFTER!", domain)
118
119         if not domain_helper.is_wanted(domain):
120             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
121             continue
122
123         logger.debug("Appending domain='%s'", domain)
124         domains.append(domain)
125
126     logger.debug("domains()=%d - EXIT!", len(domains))
127     return domains
128
129 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
130     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
131     domain_helper.raise_on(blocker)
132
133     if validators.domain(domain) and blacklist.is_blacklisted(domain):
134         raise ValueError(f"domain='{domain}' is blacklisted but function was invoked")
135     elif not isinstance(domain_hash, str) and domain_hash is not None:
136         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
137
138     logger.debug("Setting has_obfuscation=False for blocker='%s' ...", blocker)
139     instances.set_has_obfuscation(blocker, False)
140
141     logger.debug("Checking domain='%s' ...", domain)
142     if domain.find("*") >= 0:
143         logger.debug("blocker='%s' uses obfuscated domains", blocker)
144         instances.set_has_obfuscation(blocker, True)
145
146         # Obscured domain name with no hash
147         row = instances.deobfuscate("*", domain, domain_hash)
148
149         logger.debug("row[]='%s'", type(row))
150         if row is not None:
151             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
152             domain = row["domain"]
153         else:
154             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
155     elif domain.find("?") >= 0:
156         logger.debug("blocker='%s' uses obfuscated domains", blocker)
157         instances.set_has_obfuscation(blocker, True)
158
159         # Obscured domain name with no hash
160         row = instances.deobfuscate("?", domain, domain_hash)
161
162         logger.debug("row[]='%s'", type(row))
163         if row is not None:
164             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
165             domain = row["domain"]
166         else:
167             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
168     else:
169         logger.debug("domain='%s' is not obfuscated", domain)
170
171     logger.debug("domain='%s' - EXIT!", domain)
172     return domain
173
174 def base_url() -> str:
175     return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"