]> git.mxchange.org Git - fba.git/blob - fba/networks/pleroma.py
Continued:
[fba.git] / fba / networks / pleroma.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import inspect
18 import validators
19
20 from fba import blacklist
21 from fba import blocks
22 from fba import fba
23 from fba import federation
24 from fba import instances
25 from fba import network
26
27 from fba.helpers import tidyup
28
29 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
30     # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
31     if not isinstance(domain, str):
32         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
33     elif domain == "":
34         raise ValueError("Parameter 'domain' is empty")
35     elif not isinstance(origin, str) and origin is not None:
36         raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
37     elif origin == "":
38         raise ValueError("Parameter 'origin' is empty")
39     elif not isinstance(nodeinfo_url, str):
40         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
41     elif nodeinfo_url == "":
42         raise ValueError("Parameter 'nodeinfo_url' is empty")
43
44     # Blocks
45     blockdict = list()
46     rows = None
47     try:
48         rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
49     except network.exceptions as exception:
50         print(f"WARNING: Exception '{type(exception)}' during fetching nodeinfo")
51
52     if rows is None:
53         print("WARNING: Could not fetch nodeinfo from domain:", domain)
54         return
55     elif "metadata" not in rows:
56         print(f"WARNING: rows()={len(rows)} does not have key 'metadata', domain='{domain}'")
57         return
58     elif "federation" not in rows["metadata"]:
59         print(f"WARNING: rows()={len(rows['metadata'])} does not have key 'federation', domain='{domain}'")
60         return
61
62     # DEBUG: print("DEBUG: Updating nodeinfo:", domain)
63     instances.update_last_nodeinfo(domain)
64
65     data = rows["metadata"]["federation"]
66
67     if "mrf_simple" in data:
68         # DEBUG: print("DEBUG: Found mrf_simple:", domain)
69         for block_level, blocklist in (
70             {
71                 **data["mrf_simple"],
72                 **{
73                     "quarantined_instances": data["quarantined_instances"]
74                 }
75             }
76         ).items():
77             # DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
78             block_level = tidyup.domain(block_level)
79             # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
80
81             if block_level == "":
82                 print("WARNING: block_level is now empty!")
83                 continue
84
85             # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
86             if len(blocklist) > 0:
87                 for blocked in blocklist:
88                     # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
89                     blocked = tidyup.domain(blocked)
90                     # DEBUG: print("DEBUG: AFTER blocked:", blocked)
91
92                     if blocked == "":
93                         print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
94                         continue
95                     elif blacklist.is_blacklisted(blocked):
96                         # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
97                         continue
98                     elif blocked.count("*") > 0:
99                         # Obsured domain name with no hash
100                         # DEBUG: print(f"DEBUG: Trying to de-obscure blocked='{blocked}' ...")
101                         fba.cursor.execute(
102                             "SELECT domain, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
103                         )
104                         searchres = fba.cursor.fetchone()
105
106                         # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
107                         if searchres is None:
108                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
109                             continue
110
111                         # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{searchres[0]}'")
112                         blocked      = searchres[0]
113                         nodeinfo_url = searchres[1]
114
115                     # DEBUG: print(f"DEBUG: blocked='{blocked}'")
116                     if not validators.domain(blocked):
117                         print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
118                         continue
119                     elif blocked.split(".")[-1] == "arpa":
120                         print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
121                         continue
122                     elif not instances.is_registered(blocked):
123                         # Commit changes
124                         fba.connection.commit()
125
126                         # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
127                         instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
128
129                     if not blocks.is_instance_blocked(domain, blocked, block_level):
130                         # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
131                         blocks.add_instance(domain, blocked, "unknown", block_level)
132
133                         if block_level == "reject":
134                             # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
135                             blockdict.append({
136                                     "blocked": blocked,
137                                     "reason" : None
138                             })
139                         else:
140                             # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
141                             blocks.update_last_seen(domain, blocked, block_level)
142
143     # DEBUG: print("DEBUG: Committing changes ...")
144     fba.connection.commit()
145
146     # Reasons
147     if "mrf_simple_info" in data:
148         # DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
149         for block_level, info in (
150             {
151                 **data["mrf_simple_info"],
152                 **(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
153             }
154         ).items():
155             # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
156             block_level = tidyup.domain(block_level)
157             # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
158
159             if block_level == "":
160                 print("WARNING: block_level is now empty!")
161                 continue
162
163             # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
164             for blocked, reason in info.items():
165                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason[{type(reason)}]='{reason}' - BEFORE!")
166                 blocked = tidyup.domain(blocked)
167
168                 if isinstance(reason, str):
169                     # DEBUG: print("DEBUG: reason[] is a string")
170                     reason = tidyup.reason(reason)
171                 elif isinstance(reason, dict) and "reason" in reason:
172                     # DEBUG: print("DEBUG: reason[] is a dict")
173                     reason = tidyup.reason(reason["reason"])
174                 elif reason is not None:
175                     raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
176
177                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
178
179                 if blocked == "":
180                     print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
181                     continue
182                 elif blacklist.is_blacklisted(blocked):
183                     # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
184                     continue
185                 elif blocked.count("*") > 0:
186                     # Obsured domain with no hash
187                     # DEBUG: print(f"DEBUG: Trying to de-obscure blocked='{blocked}' ...")
188                     fba.cursor.execute(
189                         "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
190                     )
191                     searchres = fba.cursor.fetchone()
192
193                     # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
194                     if searchres is None:
195                         print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
196                         continue
197
198                     # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{searchres[0]}'")
199                     blocked = searchres[0]
200                     origin = searchres[1]
201                     nodeinfo_url = searchres[2]
202                 elif not validators.domain(blocked):
203                     print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
204                     continue
205
206                 # DEBUG: print(f"DEBUG: blocked='{blocked}'")
207                 if blocked.split(".")[-1] == "arpa":
208                     print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
209                     continue
210                 elif not instances.is_registered(blocked):
211                     # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodein
212                     instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
213
214                 # DEBUG: print(f"DEBUG: Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
215                 blocks.update_reason(reason, domain, blocked, block_level)
216
217                 # DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
218                 for entry in blockdict:
219                     if entry["blocked"] == blocked:
220                         # DEBUG: print(f"DEBUG: Updating entry reason: blocked='{blocked}',reason='{reason}'")
221                         entry["reason"] = reason
222
223     fba.connection.commit()
224     # DEBUG: print("DEBUG: EXIT!")