]> git.mxchange.org Git - fba.git/blob - fba/utils.py
Continued:
[fba.git] / fba / utils.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import hashlib
18 import logging
19
20 import bs4
21 import validators
22
23 from fba.helpers import blacklist
24 from fba.helpers import config
25 from fba.helpers import domain as domain_helper
26 from fba.helpers import tidyup
27
28 from fba.models import instances
29
30 logging.basicConfig(level=logging.INFO)
31 logger = logging.getLogger(__name__)
32 #logger.setLevel(logging.DEBUG)
33
34 ##### Other functions #####
35
36 def is_primitive(var: any) -> bool:
37     logger.debug("var[]='%s' - CALLED!", type(var))
38     return type(var) in [int, str, float, bool, None] or var is None
39
40 def hash_domain(domain: str) -> str:
41     logger.debug("domain='%s' - CALLED!", domain)
42     domain_helper.raise_on(domain)
43
44     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
45
46 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
47     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
48
49     if not isinstance(tags, bs4.element.ResultSet):
50         raise TypeError(f"Parameter tags[]='{type(tags)}' has not expected type 'ResultSet'")
51     elif not isinstance(search, str):
52         raise TypeError(f"Parameter search[]='{type(search)}' has not expected type 'str'")
53     elif search == "":
54         raise ValueError("Parameter 'search' is an empty string")
55
56     domains = []
57     logger.debug("Parsing %d tags ...", len(tags))
58     for tag in tags:
59         logger.debug("tag[]='%s'", type(tag))
60         domain = tag.find(search).contents[0]
61
62         logger.debug("domain='%s' - BEFORE! #1", domain)
63         domain = tidyup.domain(domain) if domain not in ["", None] else None
64         logger.debug("domain='%s' - AFTER! #2", domain)
65
66         if domain in [None, ""]:
67             logger.debug("tag='%s' has no domain, trying <em> ...", tag)
68             domain = tag.find("em").contents[0]
69
70             logger.debug("domain='%s' - BEFORE! #2", domain)
71             domain = tidyup.domain(domain) if domain not in ["", None] else None
72             logger.debug("domain='%s' - AFTER! #2", domain)
73
74         logger.debug("domain='%s' - AFTER2!", domain)
75         if domain == "":
76             logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
77             continue
78         elif domain == "noagendasocial.com/noagenda.social":
79             logger.debug("domain='%s' is a double-domain entry, adding all ...", domain)
80             add_all_to_list(domains, domain, "/")
81
82             logger.debug("domain='%s' - SKIPPING!", domain)
83             continue
84         elif "," in domain:
85             logger.debug("domain='%s' contains a comma-separated list of domains, adding all ...", domain)
86             add_all_to_list(domains, domain, ",")
87
88             logger.debug("domain='%s' - SKIPPING!", domain)
89             continue
90         elif not validators.domain(domain, rfc_2782=True):
91             logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
92             continue
93
94         logger.debug("domain='%s' - BEFORE!", domain)
95         domain = domain_helper.encode_idna(domain)
96         logger.debug("domain='%s' - AFTER!", domain)
97
98         if not domain_helper.is_wanted(domain):
99             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
100             continue
101
102         logger.debug("Appending domain='%s' ...", domain)
103         domains.append(domain)
104
105     logger.debug("domains()=%d - EXIT!", len(domains))
106     return domains
107
108 def add_all_to_list(domains: list, source: str, splitter: str) -> None:
109     logger.debug("domains()=%d,source='%s',splitter='%s' - CALLED!")
110     if not isinstance(domains, list):
111         raise TypeError(f"Parameter domains[]='{type(domains)}' is not type 'list'")
112     elif not isinstance(source, str):
113         raise TypeError(f"Parameter source[]='{type(source)}' is not type 'list'")
114     elif source == "":
115         raise ValueError("Parameter 'source' is an empty string")
116     elif not isinstance(splitter, str):
117         raise TypeError(f"Parameter splitter[]='{type(splitter)}' is not type 'list'")
118     elif splitter == "":
119         raise ValueError("Parameter 'splitter' is an empty string")
120
121     for domain in source.split(splitter):
122         logger.debug("domain='%s' - LOOP!", domain)
123         domain = domain.strip()
124         if not domain_helper.is_wanted(domain):
125             logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
126             continue
127
128         logger.debug("Appending domain='%s' ...", domain)
129         domains.append(domain)
130
131     logger.debug("EXIT!")
132
133 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
134     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
135     domain_helper.raise_on(blocker)
136
137     if validators.domain(domain, rfc_2782=True) and blacklist.is_blacklisted(domain):
138         raise RuntimeError(f"domain='{domain}' is blacklisted but function was invoked")
139     elif not isinstance(domain_hash, str) and domain_hash is not None:
140         raise TypeError(f"Parameter domain_hash[]='{type(domain_hash)}' has not expected type 'str'")
141     elif domain_hash == "":
142         raise ValueError("Parameter 'domain_hash' is an empty string")
143
144     logger.debug("Checking domain='%s' ...", domain)
145     if domain.find("*") >= 0:
146         logger.debug("blocker='%s' uses obfuscated domains", blocker)
147         instances.set_has_obfuscation(blocker, True)
148
149         # Obscured domain name with no hash
150         row = instances.deobfuscate("*", domain, domain_hash)
151
152         logger.debug("row[]='%s'", type(row))
153         if row is not None:
154             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
155             domain = row["domain"]
156         else:
157             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
158     elif domain.find("?") >= 0:
159         logger.debug("blocker='%s' uses obfuscated domains", blocker)
160         instances.set_has_obfuscation(blocker, True)
161
162         # Obscured domain name with no hash
163         row = instances.deobfuscate("?", domain, domain_hash)
164
165         logger.debug("row[]='%s'", type(row))
166         if row is not None:
167             logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
168             domain = row["domain"]
169         else:
170             logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
171     else:
172         logger.debug("domain='%s' is not obfuscated", domain)
173
174     logger.debug("domain='%s' - EXIT!", domain)
175     return domain
176
177 def base_url() -> str:
178     return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"