]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23
24 from fba.helpers import config
25 from fba.helpers import domain as domain_helper
26 from fba.helpers import tidyup
27
28 from fba.http import network
29
30 from fba.models import instances
31
32 logging.basicConfig(level=logging.INFO)
33 logger = logging.getLogger(__name__)
34
35 ##### Other functions #####
36
37 def is_primitive(var: any) -> bool:
38     logger.debug("var[]='%s' - CALLED!", type(var))
39     return type(var) in {int, str, float, bool, None} or var is None
40
41 def get_hash(domain: str) -> str:
42     logger.debug("domain='%s' - CALLED!", domain)
43     domain_helper.raise_on(domain)
44
45     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
46
47 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
48     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
49
50     if not isinstance(url, str):
51         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
52     elif url == "":
53         raise ValueError("Parameter 'url' is empty")
54     elif not isinstance(headers, dict):
55         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
56     elif not isinstance(timeout, tuple):
57         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
58
59     logger.debug("Parsing url='%s' ...", url)
60     components = urlparse(url)
61
62     # Invoke other function, avoid trailing ?
63     logger.debug("components[%s]='%s'", type(components), components)
64     if components.query != "":
65         response = network.fetch_response(components.netloc.split(":")[0], f"{components.path}?{components.query}", headers, timeout)
66     else:
67         response = network.fetch_response(components.netloc.split(":")[0], components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
68
69     logger.debug("response[]='%s' - EXIT!", type(response))
70     return response
71
72 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
73     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
74
75     if not isinstance(tags, bs4.element.ResultSet):
76         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
77     elif not isinstance(search, str):
78         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
79     elif search == "":
80         raise ValueError("Parameter 'search' is empty")
81
82     domains = list()
83     logger.debug("Parsing %d tags ...", len(tags))
84     for tag in tags:
85         logger.debug("tag[]='%s'", type(tag))
86         domain = tidyup.domain(tag.find(search).contents[0])
87         logger.debug("domain='%s' - AFTER!", domain)
88
89         if domain == "":
90             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
91             domain = tidyup.domain(tag.find("em").contents[0])
92             logger.debug("domain='%s' - AFTER!", domain)
93
94         if domain == "":
95             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
96             continue
97
98         logger.debug("domain='%s' - BEFORE!", domain)
99         domain = domain.encode("idna").decode("utf-8")
100         logger.debug("domain='%s' - AFTER!", domain)
101
102         if not domain_helper.is_wanted(domain):
103             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
104             continue
105
106         logger.debug("Appending domain='%s'", domain)
107         domains.append(domain)
108
109     logger.debug("domains()=%d - EXIT!", len(domains))
110     return domains
111
112 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
113     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
114     domain_helper.raise_on(blocker)
115
116     if not isinstance(domain, str):
117         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
118     elif domain == "":
119         raise ValueError("Parameter domain is empty")
120     elif not isinstance(domain_hash, str) and domain_hash is not None:
121         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
122
123     logger.debug("Setting has_obfuscation=False for blocker='%s' ...", blocker)
124     instances.set_has_obfuscation(blocker, False)
125
126     if domain.find("*") >= 0:
127         logger.debug("blocker='%s' uses obfuscated domains", blocker)
128         instances.set_has_obfuscation(blocker, True)
129
130         # Obscured domain name with no hash
131         row = instances.deobfuscate("*", domain, domain_hash)
132
133         logger.debug("row[]='%s'", type(row))
134         if row is not None:
135             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
136             domain = row["domain"]
137         else:
138             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
139     elif domain.find("?") >= 0:
140         logger.debug("blocker='%s' uses obfuscated domains", blocker)
141         instances.set_has_obfuscation(blocker, True)
142
143         # Obscured domain name with no hash
144         row = instances.deobfuscate("?", domain, domain_hash)
145
146         logger.debug("row[]='%s'", type(row))
147         if row is not None:
148             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
149             domain = row["domain"]
150         else:
151             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
152     else:
153         logger.debug("domain='%s' is not obfuscated", domain)
154
155     logger.debug("domain='%s' - EXIT!", domain)
156     return domain
157
158 def base_url() -> str:
159     return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"