1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
23 from fba.helpers import blacklist
24 from fba.helpers import config
25 from fba.helpers import domain as domain_helper
26 from fba.helpers import tidyup
28 from fba.models import instances
30 logging.basicConfig(level=logging.INFO)
31 logger = logging.getLogger(__name__)
32 #logger.setLevel(logging.DEBUG)
34 ##### Other functions #####
36 def is_primitive(var: any) -> bool:
37 logger.debug("var[]='%s' - CALLED!", type(var))
38 return type(var) in [int, str, float, bool, None] or var is None
40 def hash_domain(domain: str) -> str:
41 logger.debug("domain='%s' - CALLED!", domain)
42 domain_helper.raise_on(domain)
44 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
46 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
47 logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
49 if not isinstance(tags, bs4.element.ResultSet):
50 raise TypeError(f"Parameter tags[]='{type(tags)}' has not expected type 'ResultSet'")
51 elif not isinstance(search, str):
52 raise TypeError(f"Parameter search[]='{type(search)}' has not expected type 'str'")
54 raise ValueError("Parameter 'search' is an empty string")
57 logger.debug("Parsing %d tags ...", len(tags))
59 logger.debug("tag[]='%s'", type(tag))
60 domain = tag.find(search).contents[0]
62 logger.debug("domain='%s' - BEFORE! #1", domain)
63 domain = tidyup.domain(domain) if domain not in ["", None] else None
64 logger.debug("domain='%s' - AFTER! #2", domain)
66 if domain in [None, ""]:
67 logger.debug("tag='%s' has no domain, trying <em> ...", tag)
68 domain = tag.find("em").contents[0]
70 logger.debug("domain='%s' - BEFORE! #2", domain)
71 domain = tidyup.domain(domain) if domain not in ["", None] else None
72 logger.debug("domain='%s' - AFTER! #2", domain)
74 logger.debug("domain='%s' - AFTER2!", domain)
76 logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
78 elif domain == "noagendasocial.com/noagenda.social":
79 logger.debug("domain='%s' is a double-domain entry, adding all ...", domain)
80 add_all_to_list(domains, domain, "/")
82 logger.debug("domain='%s' - SKIPPING!", domain)
85 logger.debug("domain='%s' contains a comma-separated list of domains, adding all ...", domain)
86 add_all_to_list(domains, domain, ",")
88 logger.debug("domain='%s' - SKIPPING!", domain)
90 elif not validators.domain(domain, rfc_2782=True):
91 logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
94 logger.debug("domain='%s' - BEFORE!", domain)
95 domain = domain_helper.encode_idna(domain)
96 logger.debug("domain='%s' - AFTER!", domain)
98 if not domain_helper.is_wanted(domain):
99 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
102 logger.debug("Appending domain='%s' ...", domain)
103 domains.append(domain)
105 logger.debug("domains()=%d - EXIT!", len(domains))
108 def add_all_to_list(domains: list, source: str, splitter: str) -> None:
109 logger.debug("domains()=%d,source='%s',splitter='%s' - CALLED!")
110 if not isinstance(domains, list):
111 raise TypeError(f"Parameter domains[]='{type(domains)}' is not type 'list'")
112 elif not isinstance(source, str):
113 raise TypeError(f"Parameter source[]='{type(source)}' is not type 'list'")
115 raise ValueError("Parameter 'source' is an empty string")
116 elif not isinstance(splitter, str):
117 raise TypeError(f"Parameter splitter[]='{type(splitter)}' is not type 'list'")
119 raise ValueError("Parameter 'splitter' is an empty string")
121 for domain in source.split(splitter):
122 logger.debug("domain='%s' - LOOP!", domain)
123 domain = domain.strip()
124 if not domain_helper.is_wanted(domain):
125 logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
128 logger.debug("Appending domain='%s' ...", domain)
129 domains.append(domain)
131 logger.debug("EXIT!")
133 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
134 logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
135 domain_helper.raise_on(blocker)
137 if validators.domain(domain, rfc_2782=True) and blacklist.is_blacklisted(domain):
138 raise RuntimeError(f"domain='{domain}' is blacklisted but function was invoked")
139 elif not isinstance(domain_hash, str) and domain_hash is not None:
140 raise TypeError(f"Parameter domain_hash[]='{type(domain_hash)}' has not expected type 'str'")
141 elif domain_hash == "":
142 raise ValueError("Parameter 'domain_hash' is an empty string")
144 logger.debug("Checking domain='%s' ...", domain)
145 if domain.find("*") >= 0:
146 logger.debug("blocker='%s' uses obfuscated domains", blocker)
147 instances.set_has_obfuscation(blocker, True)
149 # Obscured domain name with no hash
150 row = instances.deobfuscate("*", domain, domain_hash)
152 logger.debug("row[]='%s'", type(row))
154 logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
155 domain = row["domain"]
157 logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
158 elif domain.find("?") >= 0:
159 logger.debug("blocker='%s' uses obfuscated domains", blocker)
160 instances.set_has_obfuscation(blocker, True)
162 # Obscured domain name with no hash
163 row = instances.deobfuscate("?", domain, domain_hash)
165 logger.debug("row[]='%s'", type(row))
167 logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
168 domain = row["domain"]
170 logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
172 logger.debug("domain='%s' is not obfuscated", domain)
174 logger.debug("domain='%s' - EXIT!", domain)
177 def base_url() -> str:
178 return f"{config.get('scheme')}://{config.get('hostname')}{config.get('base_url')}"