]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23
24 from fba.helpers import config
25 from fba.helpers import domain as domain_helper
26 from fba.helpers import tidyup
27
28 from fba.http import network
29
30 from fba.models import instances
31
32 logging.basicConfig(level=logging.INFO)
33 logger = logging.getLogger(__name__)
34
35 ##### Other functions #####
36
37 def is_primitive(var: any) -> bool:
38     logger.debug("var[]='%s' - CALLED!", type(var))
39     return type(var) in {int, str, float, bool, None} or var is None
40
41 def get_hash(domain: str) -> str:
42     logger.debug("domain='%s' - CALLED!", domain)
43     domain_helper.raise_on(domain)
44
45     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
46
47 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
48     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
49
50     if not isinstance(url, str):
51         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
52     elif url == "":
53         raise ValueError("Parameter 'url' is empty")
54     elif not isinstance(headers, dict):
55         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
56     elif not isinstance(timeout, tuple):
57         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
58
59     logger.debug("Parsing url='%s' ...", url)
60     components = urlparse(url)
61
62     # Invoke other function, avoid trailing ?
63     logger.debug("components[%s]='%s'", type(components), components)
64     if components.query != "":
65         response = network.fetch_response(
66             components.netloc.split(":")[0],
67             f"{components.path}?{components.query}",
68             headers,
69             timeout
70         )
71     else:
72         response = network.fetch_response(
73             components.netloc.split(":")[0],
74             components.path if isinstance(components.path, str) and components.path != '' else '/',
75             headers,
76             timeout
77         )
78
79     logger.debug("response[]='%s' - EXIT!", type(response))
80     return response
81
82 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
83     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
84
85     if not isinstance(tags, bs4.element.ResultSet):
86         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
87     elif not isinstance(search, str):
88         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
89     elif search == "":
90         raise ValueError("Parameter 'search' is empty")
91
92     domains = list()
93     logger.debug("Parsing %d tags ...", len(tags))
94     for tag in tags:
95         logger.debug("tag[]='%s'", type(tag))
96         domain = tidyup.domain(tag.find(search).contents[0])
97         logger.debug("domain='%s' - AFTER!", domain)
98
99         if domain == "":
100             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
101             domain = tidyup.domain(tag.find("em").contents[0])
102             logger.debug("domain='%s' - AFTER!", domain)
103
104         if domain == "":
105             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
106             continue
107
108         logger.debug("domain='%s' - BEFORE!", domain)
109         domain = domain.encode("idna").decode("utf-8")
110         logger.debug("domain='%s' - AFTER!", domain)
111
112         if not domain_helper.is_wanted(domain):
113             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
114             continue
115
116         logger.debug("Appending domain='%s'", domain)
117         domains.append(domain)
118
119     logger.debug("domains()=%d - EXIT!", len(domains))
120     return domains
121
122 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
123     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
124     domain_helper.raise_on(blocker)
125
126     if not isinstance(domain, str):
127         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
128     elif domain == "":
129         raise ValueError("Parameter domain is empty")
130     elif not isinstance(domain_hash, str) and domain_hash is not None:
131         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
132
133     logger.debug("Setting has_obfuscation=False for blocker='%s' ...", blocker)
134     instances.set_has_obfuscation(blocker, False)
135
136     if domain.find("*") >= 0:
137         logger.debug("blocker='%s' uses obfuscated domains", blocker)
138         instances.set_has_obfuscation(blocker, True)
139
140         # Obscured domain name with no hash
141         row = instances.deobfuscate("*", domain, domain_hash)
142
143         logger.debug("row[]='%s'", type(row))
144         if row is not None:
145             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
146             domain = row["domain"]
147         else:
148             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
149     elif domain.find("?") >= 0:
150         logger.debug("blocker='%s' uses obfuscated domains", blocker)
151         instances.set_has_obfuscation(blocker, True)
152
153         # Obscured domain name with no hash
154         row = instances.deobfuscate("?", domain, domain_hash)
155
156         logger.debug("row[]='%s'", type(row))
157         if row is not None:
158             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
159             domain = row["domain"]
160         else:
161             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
162     else:
163         logger.debug("domain='%s' is not obfuscated", domain)
164
165     logger.debug("domain='%s' - EXIT!", domain)
166     return domain
167
168 def base_url() -> str:
169     return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"