1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
20 from fba import blacklist
21 from fba import blocks
24 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
25 print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
26 if type(domain) != str:
27 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
29 raise ValueError(f"Parameter 'domain' is empty")
30 elif type(origin) != str and origin != None:
31 raise ValueError(f"Parameter origin[]={type(origin)} is not 'str'")
33 raise ValueError(f"Parameter 'origin' is empty")
34 elif type(nodeinfo_url) != str:
35 raise ValueError(f"Parameter nodeinfo_url[]={type(nodeinfo_url)} is not 'str'")
36 elif nodeinfo_url == "":
37 raise ValueError(f"Parameter 'nodeinfo_url' is empty")
42 json = fba.fetch_nodeinfo(domain, nodeinfo_url)
45 print("WARNING: Could not fetch nodeinfo from domain:", domain)
47 elif not "metadata" in json:
48 print(f"WARNING: json()={len(json)} does not have key 'metadata', domain='{domain}'")
50 elif not "federation" in json["metadata"]:
51 print(f"WARNING: json()={len(json['metadata'])} does not have key 'federation', domain='{domain}'")
54 # DEBUG: print("DEBUG: Updating nodeinfo:", domain)
55 fba.update_last_nodeinfo(domain)
57 federation = json["metadata"]["federation"]
59 if "enabled" in federation:
60 # DEBUG: print("DEBUG: Instance has no block list to analyze:", domain)
63 if "mrf_simple" in federation:
64 for block_level, blocklist in (
65 {**federation["mrf_simple"],
66 **{"quarantined_instances": federation["quarantined_instances"]}}
68 # DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
69 block_level = fba.tidyup_domain(block_level)
70 # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
73 print("WARNING: block_level is now empty!")
76 # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
77 for blocked in blocklist:
78 # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
79 blocked = fba.tidyup_domain(blocked)
80 # DEBUG: print("DEBUG: AFTER blocked:", blocked)
83 print("WARNING: blocked is empty after fba.tidyup_domain():", domain, block_level)
85 elif blacklist.is_blacklisted(blocked):
86 # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
88 elif blocked.count("*") > 1:
89 # -ACK!-oma also started obscuring domains without hash
91 "SELECT domain, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
93 searchres = fba.cursor.fetchone()
94 # DEBUG: print("DEBUG: searchres[]:", type(searchres))
97 print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
100 blocked = searchres[0]
101 nodeinfo_url = searchres[1]
102 # DEBUG: print("DEBUG: Looked up domain:", blocked)
103 elif not validators.domain(blocked):
104 print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
107 # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
108 if not validators.domain(blocked):
109 print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
111 elif not fba.is_instance_registered(blocked):
112 # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
113 instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
115 if not blocks.is_instance_blocked(domain, blocked, block_level):
116 # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
117 blocks.add_instance(domain, blocked, "unknown", block_level)
119 if block_level == "reject":
120 # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
127 # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
128 blocks.update_last_seen(domain, blocked, block_level)
130 # DEBUG: print("DEBUG: Committing changes ...")
131 fba.connection.commit()
134 if "mrf_simple_info" in federation:
135 # DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
136 for block_level, info in (
137 {**federation["mrf_simple_info"],
138 **(federation["quarantined_instances_info"]
139 if "quarantined_instances_info" in federation
142 # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
143 block_level = fba.tidyup_domain(block_level)
144 # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
146 if block_level == "":
147 print("WARNING: block_level is now empty!")
150 # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
151 for blocked, reason in info.items():
152 # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
153 blocked = fba.tidyup_domain(blocked)
154 # DEBUG: print("DEBUG: AFTER blocked:", blocked)
157 print("WARNING: blocked is empty after fba.tidyup_domain():", domain, block_level)
159 elif blacklist.is_blacklisted(blocked):
160 # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
162 elif blocked.count("*") > 1:
163 # same domain guess as above, but for reasons field
165 "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
167 searchres = fba.cursor.fetchone()
169 if searchres == None:
170 print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
173 blocked = searchres[0]
174 origin = searchres[1]
175 nodeinfo_url = searchres[2]
176 elif not validators.domain(blocked):
177 print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
180 # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
181 if not validators.domain(blocked):
182 print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
184 elif not fba.is_instance_registered(blocked):
185 # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
186 instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
188 # DEBUG: print("DEBUG: Updating block reason:", domain, blocked, reason["reason"])
189 blocks.update_reason(reason["reason"], domain, blocked, block_level)
191 # DEBUG: print(f"DEBUG: blockdict()={count(blockdict)")
192 for entry in blockdict:
193 if entry["blocked"] == blocked:
194 # DEBUG: print("DEBUG: Updating entry reason:", blocked)
195 entry["reason"] = reason["reason"]
197 fba.connection.commit()
198 except Exception as e:
199 print(f"ERROR: domain='{domain}',software='pleroma',exception[{type(e)}]:'{str(e)}'")
201 # DEBUG: print("DEBUG: EXIT!")