]> git.mxchange.org Git - fba.git/blob - fba/networks/pleroma.py
ade05924bbebd80d9691bee356d929eaa2550d87
[fba.git] / fba / networks / pleroma.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import inspect
18 import validators
19
20 import bs4
21
22 from fba import blacklist
23 from fba import blocks
24 from fba import config
25 from fba import fba
26 from fba import federation
27 from fba import instances
28 from fba import network
29
30 from fba.helpers import tidyup
31
32 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
33     # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
34     if not isinstance(domain, str):
35         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
36     elif domain == "":
37         raise ValueError("Parameter 'domain' is empty")
38     elif not isinstance(origin, str) and origin is not None:
39         raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
40     elif origin == "":
41         raise ValueError("Parameter 'origin' is empty")
42     elif not isinstance(nodeinfo_url, str):
43         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
44     elif nodeinfo_url == "":
45         raise ValueError("Parameter 'nodeinfo_url' is empty")
46
47     # Blocks
48     blockdict = list()
49     rows = None
50     try:
51         rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
52     except network.exceptions as exception:
53         print(f"WARNING: Exception '{type(exception)}' during fetching nodeinfo")
54
55     if rows is None:
56         print("WARNING: Could not fetch nodeinfo from domain:", domain)
57         return
58     elif "metadata" not in rows:
59         print(f"WARNING: rows()={len(rows)} does not have key 'metadata', domain='{domain}'")
60         return
61     elif "federation" not in rows["metadata"]:
62         print(f"WARNING: rows()={len(rows['metadata'])} does not have key 'federation', domain='{domain}'")
63         return
64
65     # DEBUG: print("DEBUG: Updating nodeinfo:", domain)
66     instances.update_last_nodeinfo(domain)
67
68     data = rows["metadata"]["federation"]
69
70     if "mrf_simple" in data:
71         # DEBUG: print("DEBUG: Found mrf_simple:", domain)
72         for block_level, blocklist in (
73             {
74                 **data["mrf_simple"],
75                 **{
76                     "quarantined_instances": data["quarantined_instances"]
77                 }
78             }
79         ).items():
80             # DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
81             block_level = tidyup.domain(block_level)
82             # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
83
84             if block_level == "":
85                 print("WARNING: block_level is now empty!")
86                 continue
87
88             # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
89             if len(blocklist) > 0:
90                 for blocked in blocklist:
91                     # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
92                     blocked = tidyup.domain(blocked)
93                     # DEBUG: print("DEBUG: AFTER blocked:", blocked)
94
95                     if blocked == "":
96                         print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
97                         continue
98                     elif blacklist.is_blacklisted(blocked):
99                         # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
100                         continue
101                     elif blocked.count("*") > 1:
102                         # -ACK!-oma also started obscuring domains without hash
103                         fba.cursor.execute(
104                             "SELECT domain, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
105                         )
106                         searchres = fba.cursor.fetchone()
107
108                         # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
109                         if searchres is None:
110                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
111                             continue
112
113                         blocked      = searchres[0]
114                         nodeinfo_url = searchres[1]
115                         # DEBUG: print("DEBUG: Looked up domain:", blocked)
116                     elif not validators.domain(blocked):
117                         print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
118                         continue
119                     elif blocked.split(".")[-1] == "arpa":
120                         print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
121                         continue
122
123                     # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
124                     if not instances.is_registered(blocked):
125                         # Commit changes
126                         fba.connection.commit()
127
128                         # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
129                         instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
130
131                     if not blocks.is_instance_blocked(domain, blocked, block_level):
132                         # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
133                         blocks.add_instance(domain, blocked, "unknown", block_level)
134
135                         if block_level == "reject":
136                             # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
137                             blockdict.append({
138                                     "blocked": blocked,
139                                     "reason" : None
140                             })
141                         else:
142                             # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
143                             blocks.update_last_seen(domain, blocked, block_level)
144
145     # DEBUG: print("DEBUG: Committing changes ...")
146     fba.connection.commit()
147
148     # Reasons
149     if "mrf_simple_info" in data:
150         # DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
151         for block_level, info in (
152             {
153                 **data["mrf_simple_info"],
154                 **(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
155             }
156         ).items():
157             # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
158             block_level = tidyup.domain(block_level)
159             # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
160
161             if block_level == "":
162                 print("WARNING: block_level is now empty!")
163                 continue
164
165             # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
166             for blocked, reason in info.items():
167                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason[{type(reason)}]='{reason}' - BEFORE!")
168                 blocked = tidyup.domain(blocked)
169
170                 if isinstance(reason, str):
171                     # DEBUG: print("DEBUG: reason[] is a string")
172                     reason = tidyup.reason(reason)
173                 elif isinstance(reason, dict) and "reason" in reason:
174                     # DEBUG: print("DEBUG: reason[] is a dict")
175                     reason = tidyup.reason(reason["reason"])
176                 elif reason is not None:
177                     raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
178
179                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
180
181                 if blocked == "":
182                     print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
183                     continue
184                 elif not validators.domain(blocked):
185                     print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - skipped!")
186                     continue
187                 elif blacklist.is_blacklisted(blocked):
188                     # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
189                     continue
190                 elif blocked.count("*") > 1:
191                     # same domain guess as above, but for reasons field
192                     fba.cursor.execute(
193                         "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
194                     )
195                     searchres = fba.cursor.fetchone()
196
197                     # DEBUG: print(f"DEBUG: searchres[]='{type(searchres)}'")
198                     if searchres is None:
199                         print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
200                         continue
201
202                     blocked = searchres[0]
203                     origin = searchres[1]
204                     nodeinfo_url = searchres[2]
205
206                 # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
207                 if blocked.split(".")[-1] == "arpa":
208                     print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
209                     continue
210                 elif not instances.is_registered(blocked):
211                     # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodein
212                     instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
213
214                 # DEBUG: print(f"DEBUG: Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
215                 blocks.update_reason(reason, domain, blocked, block_level)
216
217                 # DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
218                 for entry in blockdict:
219                     if entry["blocked"] == blocked:
220                         # DEBUG: print(f"DEBUG: Updating entry reason: blocked='{blocked}',reason='{reason}'")
221                         entry["reason"] = reason
222
223     fba.connection.commit()
224     # DEBUG: print("DEBUG: EXIT!")