]> git.mxchange.org Git - fba.git/blob - fba/federation/pleroma.py
06b7a4d869219875b5888392cfb4224d30e27102
[fba.git] / fba / federation / pleroma.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import inspect
18 import validators
19
20 from fba import blacklist
21 from fba import blocks
22 from fba import fba
23 from fba import instances
24
25 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
26     # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
27     if type(domain) != str:
28         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
29     elif domain == "":
30         raise ValueError(f"Parameter 'domain' is empty")
31     elif type(origin) != str and origin != None:
32         raise ValueError(f"Parameter origin[]={type(origin)} is not 'str'")
33     elif origin == "":
34         raise ValueError(f"Parameter 'origin' is empty")
35     elif type(nodeinfo_url) != str:
36         raise ValueError(f"Parameter nodeinfo_url[]={type(nodeinfo_url)} is not 'str'")
37     elif nodeinfo_url == "":
38         raise ValueError(f"Parameter 'nodeinfo_url' is empty")
39
40     try:
41         # Blocks
42         blockdict = list()
43         json = fba.fetch_nodeinfo(domain, nodeinfo_url)
44
45         if json is None:
46             print("WARNING: Could not fetch nodeinfo from domain:", domain)
47             return
48         elif not "metadata" in json:
49             print(f"WARNING: json()={len(json)} does not have key 'metadata', domain='{domain}'")
50             return
51         elif not "federation" in json["metadata"]:
52             print(f"WARNING: json()={len(json['metadata'])} does not have key 'federation', domain='{domain}'")
53             return
54
55         # DEBUG: print("DEBUG: Updating nodeinfo:", domain)
56         instances.update_last_nodeinfo(domain)
57
58         federation = json["metadata"]["federation"]
59
60         if "enabled" in federation:
61             # DEBUG: print("DEBUG: Instance has no block list to analyze:", domain)
62             return
63
64         if "mrf_simple" in federation:
65             for block_level, blocklist in (
66                 {**federation["mrf_simple"],
67                 **{"quarantined_instances": federation["quarantined_instances"]}}
68             ).items():
69                 # DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
70                 block_level = fba.tidyup_domain(block_level)
71                 # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
72
73                 if block_level == "":
74                     print("WARNING: block_level is now empty!")
75                     continue
76
77                 # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
78                 for blocked in blocklist:
79                     # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
80                     blocked = fba.tidyup_domain(blocked)
81                     # DEBUG: print("DEBUG: AFTER blocked:", blocked)
82
83                     if blocked == "":
84                         print("WARNING: blocked is empty after fba.tidyup_domain():", domain, block_level)
85                         continue
86                     elif blacklist.is_blacklisted(blocked):
87                         # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
88                         continue
89                     elif blocked.count("*") > 1:
90                         # -ACK!-oma also started obscuring domains without hash
91                         fba.cursor.execute(
92                             "SELECT domain, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
93                         )
94                         searchres = fba.cursor.fetchone()
95                         # DEBUG: print("DEBUG: searchres[]:", type(searchres))
96
97                         if searchres == None:
98                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
99                             continue
100
101                         blocked = searchres[0]
102                         nodeinfo_url = searchres[1]
103                         # DEBUG: print("DEBUG: Looked up domain:", blocked)
104                     elif not validators.domain(blocked):
105                         print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
106                         continue
107
108                     # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
109                     if not validators.domain(blocked):
110                         print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
111                         continue
112                     elif not instances.is_registered(blocked):
113                         # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
114                         instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
115
116                     if not blocks.is_instance_blocked(domain, blocked, block_level):
117                         # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
118                         blocks.add_instance(domain, blocked, "unknown", block_level)
119
120                         if block_level == "reject":
121                             # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
122                             blockdict.append(
123                                 {
124                                     "blocked": blocked,
125                                     "reason" : None
126                                 })
127                     else:
128                         # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
129                         blocks.update_last_seen(domain, blocked, block_level)
130
131         # DEBUG: print("DEBUG: Committing changes ...")
132         fba.connection.commit()
133
134         # Reasons
135         if "mrf_simple_info" in federation:
136             # DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
137             for block_level, info in (
138                 {**federation["mrf_simple_info"],
139                 **(federation["quarantined_instances_info"]
140                 if "quarantined_instances_info" in federation
141                 else {})}
142             ).items():
143                 # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
144                 block_level = fba.tidyup_domain(block_level)
145                 # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
146
147                 if block_level == "":
148                     print("WARNING: block_level is now empty!")
149                     continue
150
151                 # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
152                 for blocked, reason in info.items():
153                     # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - BEFORE!")
154                     blocked = fba.tidyup_domain(blocked)
155                     reason  = fba.tidyup_reason(reason) if reason != None and reason != "" else None
156                     # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
157
158                     if blocked == "":
159                         print("WARNING: blocked is empty after fba.tidyup_domain():", domain, block_level)
160                         continue
161                     elif not validators.domain(blocked):
162                         print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
163                         continue
164                     elif blacklist.is_blacklisted(blocked):
165                         # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
166                         continue
167                     elif blocked.count("*") > 1:
168                         # same domain guess as above, but for reasons field
169                         fba.cursor.execute(
170                             "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
171                         )
172                         searchres = fba.cursor.fetchone()
173
174                         if searchres == None:
175                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
176                             continue
177
178                         blocked = searchres[0]
179                         origin = searchres[1]
180                         nodeinfo_url = searchres[2]
181
182                     # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
183                     if not instances.is_registered(blocked):
184                         # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
185                         instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
186
187                     # DEBUG: print("DEBUG: Updating block reason:", domain, blocked, reason["reason"])
188                     blocks.update_reason(reason["reason"], domain, blocked, block_level)
189
190                     # DEBUG: print(f"DEBUG: blockdict()={count(blockdict)")
191                     for entry in blockdict:
192                         if entry["blocked"] == blocked:
193                             # DEBUG: print("DEBUG: Updating entry reason:", blocked)
194                             entry["reason"] = reason["reason"]
195
196         fba.connection.commit()
197     except Exception as e:
198         print(f"ERROR: domain='{domain}',software='pleroma',exception[{type(e)}]:'{str(e)}'")
199
200     # DEBUG: print("DEBUG: EXIT!")