]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1         # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import logging
18
19 from urllib.parse import urlparse
20
21 import bs4
22 import requests
23 import validators
24
25 from fba.helpers import blacklist
26 from fba.helpers import domain as domain_helper
27 from fba.helpers import tidyup
28
29 from fba.http import network
30
31 from fba.models import instances
32
33 logging.basicConfig(level=logging.INFO)
34 logger = logging.getLogger(__name__)
35
36 ##### Other functions #####
37
38 def is_primitive(var: any) -> bool:
39     logger.debug("var[]='%s' - CALLED!", type(var))
40     return type(var) in {int, str, float, bool, None} or var is None
41
42 def get_hash(domain: str) -> str:
43     logger.debug("domain='%s' - CALLED!", domain)
44     domain_helper.raise_on(domain)
45
46     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
47
48 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
49     logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
50     if not isinstance(url, str):
51         raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
52     elif url == "":
53         raise ValueError("Parameter 'url' is empty")
54     elif not isinstance(headers, dict):
55         raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
56     elif not isinstance(timeout, tuple):
57         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
58
59     logger.debug("Parsing url='%s' ...", url)
60     components = urlparse(url)
61
62     # Invoke other function, avoid trailing ?
63     logger.debug("components[%s]='%s'", type(components), components)
64     if components.query != "":
65         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
66     else:
67         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
68
69     logger.debug("response[]='%s' - EXIT!", type(response))
70     return response
71
72 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
73     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
74     if not isinstance(tags, bs4.element.ResultSet):
75         raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
76     elif not isinstance(search, str):
77         raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
78     elif search == "":
79         raise ValueError("Parameter 'search' is empty")
80
81     domains = list()
82     logger.debug("Parsing %d tags ...", len(tags))
83     for tag in tags:
84         logger.debug("tag[]='%s'", type(tag))
85         domain = tidyup.domain(tag.find(search).contents[0])
86         logger.debug("domain='%s' - AFTER!", domain)
87
88         if domain == "":
89             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
90             domain = tidyup.domain(tag.find("em").contents[0])
91             logger.debug("domain='%s' - AFTER!", domain)
92
93         if domain == "":
94             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
95             continue
96
97         logger.debug("domain='%s' - BEFORE!", domain)
98         domain = domain.encode("idna").decode("utf-8")
99         logger.debug("domain='%s' - AFTER!", domain)
100
101         if not is_domain_wanted(domain):
102             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
103             continue
104
105         logger.debug("Appending domain='%s'", domain)
106         domains.append(domain)
107
108     logger.debug("domains()=%d - EXIT!", len(domains))
109     return domains
110
111 def is_domain_wanted(domain: str) -> bool:
112     logger.debug("domain='%s' - CALLED!", domain)
113
114     wanted = True
115     if not isinstance(domain, str):
116         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
117     elif domain == "":
118         raise ValueError("Parameter 'domain' is empty")
119     elif domain.lower() != domain:
120         wanted = False
121     elif not validators.domain(domain.split("/")[0]):
122         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
123         wanted = False
124     elif domain.endswith(".arpa"):
125         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
126         wanted = False
127     elif domain.endswith(".onion"):
128         logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain)
129         wanted = False
130     elif domain.endswith(".tld"):
131         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
132         wanted = False
133     elif blacklist.is_blacklisted(domain):
134         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
135         wanted = False
136     elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
137         logger.debug("domain='%s' is a single user", domain)
138         wanted = False
139     elif domain.find("/tag/") > 0:
140         logger.debug("domain='%s' is a tag", domain)
141         wanted = False
142
143     logger.debug("wanted='%s' - EXIT!", wanted)
144     return wanted
145
146 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
147     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
148     domain_helper.raise_on(blocker)
149
150     if not isinstance(domain, str):
151         raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
152     elif domain == "":
153         raise ValueError("Parameter domain is empty")
154     elif not isinstance(domain_hash, str) and domain_hash is not None:
155         raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
156
157     if domain.find("*") >= 0:
158         logger.debug("blocker='%s' uses obfuscated domains", blocker)
159
160         # Obscured domain name with no hash
161         row = instances.deobfuscate("*", domain, domain_hash)
162
163         logger.debug("row[]='%s'", type(row))
164         if row is not None:
165             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
166             domain = row["domain"]
167         else:
168             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
169             instances.set_has_obfuscation(blocker, True)
170     elif domain.find("?") >= 0:
171         logger.debug("blocker='%s' uses obfuscated domains", blocker)
172
173         # Obscured domain name with no hash
174         row = instances.deobfuscate("?", domain, domain_hash)
175
176         logger.debug("row[]='%s'", type(row))
177         if row is not None:
178             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
179             domain = row["domain"]
180         else:
181             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
182             instances.set_has_obfuscation(blocker, True)
183     else:
184         logger.debug("domain='%s' is not obfuscated", domain)
185
186     logger.debug("domain='%s' - EXIT!", domain)
187     return domain