]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import config
27 from fba.helpers import domain as domain_helper
28 from fba.helpers import tidyup
29
30 from fba.http import network
31
32 from fba.models import instances
33
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
36
37 ##### Other functions #####
38
39 def is_primitive(var: any) -> bool:
40     logger.debug("var[]='%s' - CALLED!", type(var))
41     return type(var) in {int, str, float, bool, None} or var is None
42
43 def get_hash(domain: str) -> str:
44     logger.debug("domain='%s' - CALLED!", domain)
45     domain_helper.raise_on(domain)
46
47     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
48
49 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
50     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
51
52     if not isinstance(url, str):
53         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
54     elif url == "":
55         raise ValueError("Parameter 'url' is empty")
56     elif not validators.url(url):
57         raise ValueError(f"Parameter url='{url}' is not a valid URL")
58     elif not isinstance(headers, dict):
59         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
60     elif not isinstance(timeout, tuple):
61         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
62
63     logger.debug("Parsing url='%s' ...", url)
64     components = urlparse(url)
65
66     # Invoke other function, avoid trailing ?
67     logger.debug("components[%s]='%s'", type(components), components)
68     if components.query != "":
69         response = network.fetch_response(
70             components.netloc.split(":")[0],
71             f"{components.path}?{components.query}",
72             headers,
73             timeout
74         )
75     else:
76         response = network.fetch_response(
77             components.netloc.split(":")[0],
78             components.path if isinstance(components.path, str) and components.path != '' else '/',
79             headers,
80             timeout
81         )
82
83     logger.debug("response[]='%s' - EXIT!", type(response))
84     return response
85
86 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
87     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
88
89     if not isinstance(tags, bs4.element.ResultSet):
90         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
91     elif not isinstance(search, str):
92         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
93     elif search == "":
94         raise ValueError("Parameter 'search' is empty")
95
96     domains = list()
97     logger.debug("Parsing %d tags ...", len(tags))
98     for tag in tags:
99         logger.debug("tag[]='%s'", type(tag))
100         domain = tidyup.domain(tag.find(search).contents[0])
101         logger.debug("domain='%s' - AFTER!", domain)
102
103         if domain == "":
104             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
105             domain = tidyup.domain(tag.find("em").contents[0])
106             logger.debug("domain='%s' - AFTER!", domain)
107
108         if domain == "":
109             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
110             continue
111
112         logger.debug("domain='%s' - BEFORE!", domain)
113         domain = domain.encode("idna").decode("utf-8")
114         logger.debug("domain='%s' - AFTER!", domain)
115
116         if not domain_helper.is_wanted(domain):
117             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
118             continue
119
120         logger.debug("Appending domain='%s'", domain)
121         domains.append(domain)
122
123     logger.debug("domains()=%d - EXIT!", len(domains))
124     return domains
125
126 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
127     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
128     domain_helper.raise_on(blocker)
129
130     if validators.domain(domain) and blacklist.is_blacklisted(domain):
131         raise ValueError(f"domain='{domain}' is blacklisted but function was invoked")
132     elif not isinstance(domain_hash, str) and domain_hash is not None:
133         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
134
135     logger.debug("Setting has_obfuscation=False for blocker='%s' ...", blocker)
136     instances.set_has_obfuscation(blocker, False)
137
138     if domain.find("*") >= 0:
139         logger.debug("blocker='%s' uses obfuscated domains", blocker)
140         instances.set_has_obfuscation(blocker, True)
141
142         # Obscured domain name with no hash
143         row = instances.deobfuscate("*", domain, domain_hash)
144
145         logger.debug("row[]='%s'", type(row))
146         if row is not None:
147             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
148             domain = row["domain"]
149         else:
150             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
151     elif domain.find("?") >= 0:
152         logger.debug("blocker='%s' uses obfuscated domains", blocker)
153         instances.set_has_obfuscation(blocker, True)
154
155         # Obscured domain name with no hash
156         row = instances.deobfuscate("?", domain, domain_hash)
157
158         logger.debug("row[]='%s'", type(row))
159         if row is not None:
160             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
161             domain = row["domain"]
162         else:
163             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
164     else:
165         logger.debug("domain='%s' is not obfuscated", domain)
166
167     logger.debug("domain='%s' - EXIT!", domain)
168     return domain
169
170 def base_url() -> str:
171     return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"