]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23
24 from fba.helpers import blacklist
25 from fba.helpers import config
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
28
29 from fba.http import network
30
31 from fba.models import instances
32
33 logging.basicConfig(level=logging.INFO)
34 logger = logging.getLogger(__name__)
35
36 ##### Other functions #####
37
38 def is_primitive(var: any) -> bool:
39     logger.debug("var[]='%s' - CALLED!", type(var))
40     return type(var) in {int, str, float, bool, None} or var is None
41
42 def get_hash(domain: str) -> str:
43     logger.debug("domain='%s' - CALLED!", domain)
44     domain_helper.raise_on(domain)
45
46     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
47
48 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
49     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
50
51     if not isinstance(url, str):
52         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
53     elif url == "":
54         raise ValueError("Parameter 'url' is empty")
55     elif not isinstance(headers, dict):
56         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
57     elif not isinstance(timeout, tuple):
58         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
59
60     logger.debug("Parsing url='%s' ...", url)
61     components = urlparse(url)
62
63     # Invoke other function, avoid trailing ?
64     logger.debug("components[%s]='%s'", type(components), components)
65     if components.query != "":
66         response = network.fetch_response(
67             components.netloc.split(":")[0],
68             f"{components.path}?{components.query}",
69             headers,
70             timeout
71         )
72     else:
73         response = network.fetch_response(
74             components.netloc.split(":")[0],
75             components.path if isinstance(components.path, str) and components.path != '' else '/',
76             headers,
77             timeout
78         )
79
80     logger.debug("response[]='%s' - EXIT!", type(response))
81     return response
82
83 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
84     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
85
86     if not isinstance(tags, bs4.element.ResultSet):
87         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
88     elif not isinstance(search, str):
89         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
90     elif search == "":
91         raise ValueError("Parameter 'search' is empty")
92
93     domains = list()
94     logger.debug("Parsing %d tags ...", len(tags))
95     for tag in tags:
96         logger.debug("tag[]='%s'", type(tag))
97         domain = tidyup.domain(tag.find(search).contents[0])
98         logger.debug("domain='%s' - AFTER!", domain)
99
100         if domain == "":
101             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
102             domain = tidyup.domain(tag.find("em").contents[0])
103             logger.debug("domain='%s' - AFTER!", domain)
104
105         if domain == "":
106             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
107             continue
108
109         logger.debug("domain='%s' - BEFORE!", domain)
110         domain = domain.encode("idna").decode("utf-8")
111         logger.debug("domain='%s' - AFTER!", domain)
112
113         if not domain_helper.is_wanted(domain):
114             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
115             continue
116
117         logger.debug("Appending domain='%s'", domain)
118         domains.append(domain)
119
120     logger.debug("domains()=%d - EXIT!", len(domains))
121     return domains
122
123 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
124     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
125     domain_helper.raise_on(blocker)
126
127     if blacklist.is_blacklisted(domain):
128         raise ValueError(f"domain='{domain}' is blacklisted but function was invoked")
129     elif not isinstance(domain_hash, str) and domain_hash is not None:
130         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
131
132     logger.debug("Setting has_obfuscation=False for blocker='%s' ...", blocker)
133     instances.set_has_obfuscation(blocker, False)
134
135     if domain.find("*") >= 0:
136         logger.debug("blocker='%s' uses obfuscated domains", blocker)
137         instances.set_has_obfuscation(blocker, True)
138
139         # Obscured domain name with no hash
140         row = instances.deobfuscate("*", domain, domain_hash)
141
142         logger.debug("row[]='%s'", type(row))
143         if row is not None:
144             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
145             domain = row["domain"]
146         else:
147             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
148     elif domain.find("?") >= 0:
149         logger.debug("blocker='%s' uses obfuscated domains", blocker)
150         instances.set_has_obfuscation(blocker, True)
151
152         # Obscured domain name with no hash
153         row = instances.deobfuscate("?", domain, domain_hash)
154
155         logger.debug("row[]='%s'", type(row))
156         if row is not None:
157             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
158             domain = row["domain"]
159         else:
160             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
161     else:
162         logger.debug("domain='%s' is not obfuscated", domain)
163
164     logger.debug("domain='%s' - EXIT!", domain)
165     return domain
166
167 def base_url() -> str:
168     return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"