]> git.mxchange.org Git - fba.git/blob - pleroma.py
a216634e412013e01572091cf9027e78cff3e7bd
[fba.git] / pleroma.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import inspect
18 import validators
19
20 from fba import blacklist
21 from fba import blocks
22 from fba import fba
23 from fba import federation
24 from fba import instances
25 from fba import network
26
27 from fba.helpers import tidyup
28
29 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
30     # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
31     if not isinstance(domain, str):
32         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
33     elif domain == "":
34         raise ValueError("Parameter 'domain' is empty")
35     elif not isinstance(origin, str) and origin is not None:
36         raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
37     elif origin == "":
38         raise ValueError("Parameter 'origin' is empty")
39     elif not isinstance(nodeinfo_url, str):
40         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
41     elif nodeinfo_url == "":
42         raise ValueError("Parameter 'nodeinfo_url' is empty")
43
44     # Blocks
45     blockdict = list()
46     rows = None
47     try:
48         rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
49     except network.exceptions as exception:
50         print(f"WARNING: Exception '{type(exception)}' during fetching nodeinfo")
51
52     if rows is None:
53         print("WARNING: Could not fetch nodeinfo from domain:", domain)
54         return
55     elif "metadata" not in rows:
56         print(f"WARNING: rows()={len(rows)} does not have key 'metadata', domain='{domain}'")
57         return
58     elif "federation" not in rows["metadata"]:
59         print(f"WARNING: rows()={len(rows['metadata'])} does not have key 'federation', domain='{domain}'")
60         return
61
62     # DEBUG: print("DEBUG: Updating nodeinfo:", domain)
63     instances.update_last_nodeinfo(domain)
64
65     data = rows["metadata"]["federation"]
66
67     if "mrf_simple" in data:
68         # DEBUG: print("DEBUG: Found mrf_simple:", domain)
69         for block_level, blocklist in (
70             {
71                 **data["mrf_simple"],
72                 **{
73                     "quarantined_instances": data["quarantined_instances"]
74                 }
75             }
76         ).items():
77             # DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
78             block_level = tidyup.domain(block_level)
79             # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
80
81             if block_level == "":
82                 print("WARNING: block_level is now empty!")
83                 continue
84
85             # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
86             if len(blocklist) > 0:
87                 for blocked in blocklist:
88                     # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
89                     blocked = tidyup.domain(blocked)
90                     # DEBUG: print("DEBUG: AFTER blocked:", blocked)
91
92                     if blocked == "":
93                         print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
94                         continue
95                     elif blacklist.is_blacklisted(blocked):
96                         # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
97                         continue
98                     elif blocked.count("*") > 0:
99                         # Obscured domain name with no hash
100                         row = instances.deobscure("*", blocked)
101
102                         # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
103                         if row is None:
104                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
105                             continue
106
107                         # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
108                         blocked      = row[0]
109                         origin       = row[1]
110                         nodeinfo_url = row[2]
111
112                     # DEBUG: print(f"DEBUG: blocked='{blocked}'")
113                     if not validators.domain(blocked):
114                         print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
115                         continue
116                     elif blocked.split(".")[-1] == "arpa":
117                         print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
118                         continue
119                     elif not instances.is_registered(blocked):
120                         # Commit changes
121                         fba.connection.commit()
122
123                         # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
124                         instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
125
126                     if not blocks.is_instance_blocked(domain, blocked, block_level):
127                         # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
128                         blocks.add_instance(domain, blocked, "unknown", block_level)
129
130                         if block_level == "reject":
131                             # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
132                             blockdict.append({
133                                     "blocked": blocked,
134                                     "reason" : None
135                             })
136                         else:
137                             # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
138                             blocks.update_last_seen(domain, blocked, block_level)
139
140     # DEBUG: print("DEBUG: Committing changes ...")
141     fba.connection.commit()
142
143     # Reasons
144     if "mrf_simple_info" in data:
145         # DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
146         for block_level, info in (
147             {
148                 **data["mrf_simple_info"],
149                 **(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
150             }
151         ).items():
152             # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
153             block_level = tidyup.domain(block_level)
154             # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
155
156             if block_level == "":
157                 print("WARNING: block_level is now empty!")
158                 continue
159
160             # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
161             for blocked, reason in info.items():
162                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason[{type(reason)}]='{reason}' - BEFORE!")
163                 blocked = tidyup.domain(blocked)
164
165                 if isinstance(reason, str):
166                     # DEBUG: print("DEBUG: reason[] is a string")
167                     reason = tidyup.reason(reason)
168                 elif isinstance(reason, dict) and "reason" in reason:
169                     # DEBUG: print("DEBUG: reason[] is a dict")
170                     reason = tidyup.reason(reason["reason"])
171                 elif reason is not None:
172                     raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
173
174                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
175
176                 if blocked == "":
177                     print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
178                     continue
179                 elif blacklist.is_blacklisted(blocked):
180                     # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
181                     continue
182                 elif blocked.count("*") > 0:
183                     # Obscured domain name with no hash
184                     row = instances.deobscure("*", blocked)
185
186                     # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
187                     if row is None:
188                         print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
189                         continue
190
191                     # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
192                     blocked      = row[0]
193                     origin       = row[1]
194                     nodeinfo_url = row[2]
195
196                 # DEBUG: print(f"DEBUG: blocked='{blocked}'")
197                 if not validators.domain(blocked):
198                     print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
199                     continue
200                 elif blocked.split(".")[-1] == "arpa":
201                     print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
202                     continue
203                 elif not instances.is_registered(blocked):
204                     # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodein
205                     instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
206
207                 # DEBUG: print(f"DEBUG: Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
208                 blocks.update_reason(reason, domain, blocked, block_level)
209
210                 # DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
211                 for entry in blockdict:
212                     if entry["blocked"] == blocked:
213                         # DEBUG: print(f"DEBUG: Updating entry reason: blocked='{blocked}',reason='{reason}'")
214                         entry["reason"] = reason
215
216     fba.connection.commit()
217     # DEBUG: print("DEBUG: EXIT!")