]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import config
27 from fba.helpers import domain as domain_helper
28 from fba.helpers import tidyup
29
30 from fba.http import network
31
32 from fba.models import instances
33
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
36
37 ##### Other functions #####
38
39 def is_primitive(var: any) -> bool:
40     logger.debug("var[]='%s' - CALLED!", type(var))
41     return type(var) in {int, str, float, bool, None} or var is None
42
43 def get_hash(domain: str) -> str:
44     logger.debug("domain='%s' - CALLED!", domain)
45     domain_helper.raise_on(domain)
46
47     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
48
49 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
50     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
51
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not validators.url(url):
57         raise ValueError(f"Parameter url='{url}' is not a valid URL")
58     elif not isinstance(headers, dict):
59         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
60     elif not isinstance(timeout, tuple):
61         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
62
63     logger.debug("Parsing url='%s' ...", url)
64     components = urlparse(url)
65
66     # Invoke other function, avoid trailing ?
67     logger.debug("components[%s]='%s'", type(components), components)
68     if components.query != "":
69         logger.debug("Fetching path='%s?%s' from netloc='%s' ...", components.path, components.query, components.netloc)
70         response = network.fetch_response(
71             components.netloc.split(":")[0],
72             f"{components.path}?{components.query}",
73             headers,
74             timeout
75         )
76     else:
77         logger.debug("Fetching path='%s' from netloc='%s' ...", components.path, components.netloc)
78         response = network.fetch_response(
79             components.netloc.split(":")[0],
80             components.path if isinstance(components.path, str) and components.path != '' else '/',
81             headers,
82             timeout
83         )
84
85     logger.debug("response[]='%s' - EXIT!", type(response))
86     return response
87
88 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
89     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
90
91     if not isinstance(tags, bs4.element.ResultSet):
92         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
93     elif not isinstance(search, str):
94         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
95     elif search == "":
96         raise ValueError("Parameter 'search' is empty")
97
98     domains = list()
99     logger.debug("Parsing %d tags ...", len(tags))
100     for tag in tags:
101         logger.debug("tag[]='%s'", type(tag))
102         domain = tidyup.domain(tag.find(search).contents[0])
103         logger.debug("domain='%s' - AFTER!", domain)
104
105         if domain == "":
106             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
107             domain = tidyup.domain(tag.find("em").contents[0])
108             logger.debug("domain='%s' - AFTER!", domain)
109
110         logger.debug("domain='%s' - AFTER2!", domain)
111         if domain == "":
112             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
113             continue
114
115         logger.debug("domain='%s' - BEFORE!", domain)
116         domain = domain_helper.encode_idna(domain)
117         logger.debug("domain='%s' - AFTER!", domain)
118
119         if not domain_helper.is_wanted(domain):
120             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
121             continue
122
123         logger.debug("Appending domain='%s'", domain)
124         domains.append(domain)
125
126     logger.debug("domains()=%d - EXIT!", len(domains))
127     return domains
128
129 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
130     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
131     domain_helper.raise_on(blocker)
132
133     if validators.domain(domain, rfc_2782=True) and blacklist.is_blacklisted(domain):
134         raise ValueError(f"domain='{domain}' is blacklisted but function was invoked")
135     elif not isinstance(domain_hash, str) and domain_hash is not None:
136         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
137
138     logger.debug("Checking domain='%s' ...", domain)
139     if domain.find("*") >= 0:
140         logger.debug("blocker='%s' uses obfuscated domains", blocker)
141         instances.set_has_obfuscation(blocker, True)
142
143         # Obscured domain name with no hash
144         row = instances.deobfuscate("*", domain, domain_hash)
145
146         logger.debug("row[]='%s'", type(row))
147         if row is not None:
148             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
149             domain = row["domain"]
150         else:
151             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
152     elif domain.find("?") >= 0:
153         logger.debug("blocker='%s' uses obfuscated domains", blocker)
154         instances.set_has_obfuscation(blocker, True)
155
156         # Obscured domain name with no hash
157         row = instances.deobfuscate("?", domain, domain_hash)
158
159         logger.debug("row[]='%s'", type(row))
160         if row is not None:
161             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
162             domain = row["domain"]
163         else:
164             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
165     else:
166         logger.debug("domain='%s' is not obfuscated", domain)
167
168     logger.debug("domain='%s' - EXIT!", domain)
169     return domain
170
171 def base_url() -> str:
172     return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"