1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
20 from fba import blacklist
21 from fba import blocks
23 from fba import federation
24 from fba import instances
25 from fba.helpers import tidyup
27 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
28 # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
29 if not isinstance(domain, str):
30 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
32 raise ValueError("Parameter 'domain' is empty")
33 elif not isinstance(origin, str) and origin is not None:
34 raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
36 raise ValueError("Parameter 'origin' is empty")
37 elif not isinstance(nodeinfo_url, str):
38 raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
39 elif nodeinfo_url == "":
40 raise ValueError("Parameter 'nodeinfo_url' is empty")
44 rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
47 print("WARNING: Could not fetch nodeinfo from domain:", domain)
49 elif "metadata" not in rows:
50 print(f"WARNING: rows()={len(rows)} does not have key 'metadata', domain='{domain}'")
52 elif "federation" not in rows["metadata"]:
53 print(f"WARNING: rows()={len(rows['metadata'])} does not have key 'federation', domain='{domain}'")
56 # DEBUG: print("DEBUG: Updating nodeinfo:", domain)
57 instances.update_last_nodeinfo(domain)
59 data = rows["metadata"]["federation"]
62 # DEBUG: print("DEBUG: Instance has no block list to analyze:", domain)
65 if "mrf_simple" in data:
66 for block_level, blocklist in (
67 {**data["mrf_simple"],
68 **{"quarantined_instances": data["quarantined_instances"]}}
70 # DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
71 block_level = tidyup.domain(block_level)
72 # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
75 print("WARNING: block_level is now empty!")
78 # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
79 if len(blocklist) > 0:
80 for blocked in blocklist:
81 # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
82 blocked = tidyup.domain(blocked)
83 # DEBUG: print("DEBUG: AFTER blocked:", blocked)
86 print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
88 elif blacklist.is_blacklisted(blocked):
89 # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
91 elif blocked.count("*") > 1:
92 # -ACK!-oma also started obscuring domains without hash
94 "SELECT domain, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
96 searchres = fba.cursor.fetchone()
98 print(f"DEBUG: searchres[]='{type(searchres)}'")
100 print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
103 blocked = searchres[0]
104 nodeinfo_url = searchres[1]
105 # DEBUG: print("DEBUG: Looked up domain:", blocked)
106 elif not validators.domain(blocked):
107 print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
110 # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
111 if not validators.domain(blocked):
112 print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
114 elif blocked.split(".")[-1] == "arpa":
115 print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
117 elif not instances.is_registered(blocked):
118 # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
119 instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
121 if not blocks.is_instance_blocked(domain, blocked, block_level):
122 # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
123 blocks.add_instance(domain, blocked, "unknown", block_level)
125 if block_level == "reject":
126 # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
132 # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
133 blocks.update_last_seen(domain, blocked, block_level)
135 # DEBUG: print(f"DEBUG: domain='{domain}' has returned zero rows, trying /about/more page ...")
136 rows = fetch_blocks_from_about(domain)
138 # DEBUG: print("DEBUG: Committing changes ...")
139 fba.connection.commit()
142 if "mrf_simple_info" in data:
143 # DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
144 for block_level, info in (
145 {**data["mrf_simple_info"],
146 **(data["quarantined_instances_info"]
147 if "quarantined_instances_info" in data
150 # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
151 block_level = tidyup.domain(block_level)
152 # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
154 if block_level == "":
155 print("WARNING: block_level is now empty!")
158 # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
159 for blocked, reason in info.items():
160 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - BEFORE!")
161 blocked = tidyup.domain(blocked)
162 reason = tidyup.reason(reason) if reason is not None and reason != "" else None
163 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
166 print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
168 elif not validators.domain(blocked):
169 print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
171 elif blacklist.is_blacklisted(blocked):
172 # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
174 elif blocked.count("*") > 1:
175 # same domain guess as above, but for reasons field
177 "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
179 searchres = fba.cursor.fetchone()
181 # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
182 if searchres is None:
183 print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
186 blocked = searchres[0]
187 origin = searchres[1]
188 nodeinfo_url = searchres[2]
190 # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
191 if blocked.split(".")[-1] == "arpa":
192 print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
194 elif not instances.is_registered(blocked):
195 # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
196 instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
198 # DEBUG: print("DEBUG: Updating block reason:", domain, blocked, reason["reason"])
199 blocks.update_reason(reason["reason"], domain, blocked, block_level)
201 # DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
202 for entry in blockdict:
203 if entry["blocked"] == blocked:
204 # DEBUG: print("DEBUG: Updating entry reason:", blocked)
205 entry["reason"] = reason["reason"]
207 fba.connection.commit()
209 # DEBUG: print("DEBUG: EXIT!")
211 def fetch_blocks_from_about(domain: str) -> dict:
212 print(f"DEBUG: domain='{domain}' - CALLED!")
213 if not isinstance(domain, str):
214 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
216 raise ValueError("Parameter 'domain' is empty")
218 print("DEBUG: Fetching mastodon blocks from domain:", domain)
220 "Suspended servers": [],
221 "Filtered media" : [],
222 "Limited servers" : [],
223 "Silenced servers" : [],
227 for path in ("/about/more", "/about"):
229 print(f"DEBUG: Fetching path='{path}' from domain='{domain}' ...")
230 doc = bs4.BeautifulSoup(
231 network.fetch_response(
235 (config.get("connection_timeout"), config.get("read_timeout"))
240 if len(doc.find_all("h3")) > 0:
241 print(f"DEBUG: path='{path}' had some headlines - BREAK!")
244 except BaseException as exception:
245 print("ERROR: Cannot fetch from domain:", domain, exception)
246 instances.update_last_error(domain, exception)
249 print(f"DEBUG: doc[]='{type(doc)}'")
251 print(f"WARNING: Cannot find any 'h3' tags for domain='{domain}' - EXIT!")
254 for header in doc.find_all("h3"):
255 header_text = tidyup.reason(header.text)
257 print(f"DEBUG: header_text='{header_text}'")
258 if header_text in language_mapping:
259 print(f"DEBUG: header_text='{header_text}'")
260 header_text = language_mapping[header_text]
262 print(f"WARNING: header_text='{header_text}' not found in language mapping table")
264 if header_text in blocklist or header_text.lower() in blocklist:
265 # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
266 for line in header.find_all_next("table")[0].find_all("tr")[1:]:
267 blocklist[header_text].append(
269 "domain": tidyup.domain(line.find("span").text),
270 "hash" : tidyup.domain(line.find("span")["title"][9:]),
271 "reason": tidyup.reason(line.find_all("td")[1].text),
275 print(f"WARNING: header_text='{header_text}' not found in blocklist()={len(blocklist)}")
277 print("DEBUG: Returning blocklist for domain:", domain)
279 "reject" : blocklist["Suspended servers"],
280 "media_removal" : blocklist["Filtered media"],
281 "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],