]> git.mxchange.org Git - fba.git/blob - fba/networks/pleroma.py
Continued:
[fba.git] / fba / networks / pleroma.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import logging
18
19 import bs4
20
21 from fba import database
22 from fba import utils
23
24 from fba.helpers import config
25 from fba.helpers import domain as domain_helper
26 from fba.helpers import tidyup
27
28 from fba.http import network
29 from fba.http import nodeinfo
30
31 from fba.models import blocks
32 from fba.models import instances
33
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
36
37 # Language mapping X -> English
38 language_mapping = {
39     # English -> English
40     "filtered media"   : "filtered_media",
41     "limited servers"  : "followers_only",
42     "followers-only"   : "followers_only",
43     "media removal"    : "media_removal",
44     "media_removal"    : "media_removal",
45     "media force-set as sensitive": "media_nsfw",
46     "nsfw"             : "media_nsfw",
47     "reject"           : "reject",
48     "suspended servers": "reject",
49     "silenced servers" : "silenced",
50     "removal from \"the whole known network\" timeline": "federated_timeline_removal",
51 }
52
53 def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
54     logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
55     domain_helper.raise_on(domain)
56
57     if not isinstance(nodeinfo_url, str):
58         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not of type 'str'")
59     elif nodeinfo_url == "":
60         raise ValueError("Parameter 'nodeinfo_url' is empty")
61
62     blockdict = list()
63     rows = None
64     try:
65         logger.debug("Fetching nodeinfo: domain='%s',nodeinfo_url='%s'", domain, nodeinfo_url)
66         rows = nodeinfo.fetch_nodeinfo(domain, nodeinfo_url)
67
68         if "error_message" in rows:
69             logger.warning("Error message '%s' during fetching nodeinfo for domain='%s',nodeinfo_url='%s'", rows["error_message"], domain, nodeinfo_url)
70             instances.set_last_error(domain, rows)
71
72             logger.debug("Returning empty list ... - EXIT!")
73             return list()
74         elif "exception" in rows:
75             logger.warning("Exception '%s' during fetching nodeinfo for domain='%s',nodeinfo_url='%s' - EXIT!", type(rows["exception"]), domain, nodeinfo_url)
76             return list()
77         elif "json" in rows:
78             logger.debug("rows[json] found for domain='%s',nodeinfo_url='%s'", domain, nodeinfo_url)
79             rows = rows["json"]
80
81     except network.exceptions as exception:
82         logger.warning("Exception '%s' during fetching nodeinfo from domain='%s'", type(exception), domain)
83         instances.set_last_error(domain, exception)
84
85     if rows is None:
86         logger.warning("Could not fetch nodeinfo from domain='%s' - EXIT!", domain)
87         return list()
88     elif "metadata" not in rows:
89         logger.warning("rows()=%d does not have key 'metadata', domain='%s' - EXIT!", len(rows), domain)
90         return list()
91     elif "federation" not in rows["metadata"]:
92         logger.warning("rows()=%d does not have key 'federation', domain='%s' - EXIT!", len(rows["metadata"]), domain)
93         return list()
94
95     data = rows["metadata"]["federation"]
96     found = False
97
98     logger.debug("data[]='%s'", type(data))
99     if "mrf_simple" in data:
100         logger.debug("Found mrf_simple in API response from domain='%s'", domain)
101         found = True
102         for block_level, blocklist in (
103             {
104                 **data["mrf_simple"],
105                 **{
106                     "quarantined_instances": data["quarantined_instances"]
107                 }
108             }
109         ).items():
110             logger.debug("block_level='%s', blocklist()=%d", block_level, len(blocklist))
111             block_level = tidyup.domain(block_level)
112             logger.debug("block_level='%s' - AFTER!", block_level)
113
114             if block_level == "":
115                 logger.warning("block_level is now empty!")
116                 continue
117             elif block_level == "accept":
118                 logger.debug("domain='%s' skipping block_level='accept'", domain)
119                 continue
120
121             block_level = blocks.alias_block_level(block_level)
122
123             logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(blocklist), domain, block_level)
124             if len(blocklist) > 0:
125                 for blocked in blocklist:
126                     logger.debug("blocked='%s' - BEFORE!", blocked)
127                     blocked = tidyup.domain(blocked)
128                     logger.debug("blocked='%s' - AFTER!", blocked)
129
130                     if blocked == "":
131                         logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s' - SKIPPED!", domain, block_level)
132                         continue
133                     elif not domain_helper.is_wanted(blocked):
134                         logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
135                         continue
136
137                     logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
138                     blocked = utils.deobfuscate(blocked, domain)
139
140                     logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
141                     if not domain_helper.is_wanted(blocked):
142                         logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
143                         continue
144
145                     logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
146                     blockdict.append({
147                         "blocker"    : domain,
148                         "blocked"    : blocked,
149                         "reason"     : None,
150                         "block_level": block_level,
151                     })
152
153     elif "quarantined_instances" in data:
154         logger.debug("Found 'quarantined_instances' in JSON response: domain='%s'", domain)
155         found = True
156         block_level = "quarantined"
157
158         for blocked in data["quarantined_instances"]:
159             logger.debug("blocked='%s' - BEFORE!", blocked)
160             blocked = tidyup.domain(blocked)
161             logger.debug("blocked='%s' - AFTER!", blocked)
162
163             if blocked == "":
164                 logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
165                 continue
166             elif not domain_helper.is_wanted(blocked):
167                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
168                 continue
169
170             logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
171             blocked = utils.deobfuscate(blocked, domain)
172
173             logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
174             if not domain_helper.is_wanted(blocked):
175                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
176                 continue
177
178             logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
179             blockdict.append({
180                 "blocker"    : domain,
181                 "blocked"    : blocked,
182                 "reason"     : None,
183                 "block_level": block_level,
184             })
185
186     else:
187         logger.warning("Cannot find 'mrf_simple' or 'quarantined_instances' in JSON reply: domain='%s'", domain)
188
189     logger.debug("Invoking commit() ...")
190     database.connection.commit()
191
192     # Reasons
193     if "mrf_simple_info" in data:
194         logger.debug("Found mrf_simple_info in API response: domain='%s'", domain)
195         found = True
196         for block_level, info in (
197             {
198                 **data["mrf_simple_info"],
199                 **(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
200             }
201         ).items():
202             logger.debug("block_level='%s', info.items()=%d", block_level, len(info.items()))
203             block_level = tidyup.domain(block_level)
204             logger.debug("block_level='%s' - AFTER!", block_level)
205
206             if block_level == "":
207                 logger.warning("block_level is now empty!")
208                 continue
209             elif block_level == "accept":
210                 logger.debug("domain='%s': Skipping block_level='%s' ...", domain, block_level)
211                 continue
212
213             block_level = blocks.alias_block_level(block_level)
214
215             logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(info.items()), domain, block_level)
216             for blocked, reason in info.items():
217                 logger.debug("blocked='%s',reason[%s]='%s' - BEFORE!", blocked, type(reason), reason)
218                 blocked = tidyup.domain(blocked)
219                 logger.debug("blocked='%s' - AFTER!", blocked)
220
221                 if isinstance(reason, str):
222                     logger.debug("reason[] is a string")
223                     reason = tidyup.reason(reason)
224                 elif isinstance(reason, dict) and "reason" in reason:
225                     logger.debug("reason[] is a dict")
226                     reason = tidyup.reason(reason["reason"]) if isinstance(reason["reason"], str) else None
227                 elif reason is not None:
228                     raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
229
230                 logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
231
232                 if blocked == "":
233                     logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
234                     continue
235                 elif not domain_helper.is_wanted(blocked):
236                     logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
237                     continue
238
239                 logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
240                 blocked = utils.deobfuscate(blocked, domain)
241                 logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
242
243                 logger.debug("Checking %d blockdict records ...", len(blockdict))
244                 for block in blockdict:
245                     logger.debug("block[blocked]='%s',blocked='%s'", block["blocked"], blocked)
246                     if block["blocked"] == blocked:
247                         logger.debug("Updating reason='%s' for blocker='%s'", reason, block["blocked"])
248                         block["reason"] = reason
249
250     elif "quarantined_instances_info" in data and "quarantined_instances" in data["quarantined_instances_info"]:
251         logger.debug("Found 'quarantined_instances_info' in JSON response: domain='%s'", domain)
252         found = True
253         block_level = "quarantined"
254
255         #print(data["quarantined_instances_info"])
256         rows = data["quarantined_instances_info"]["quarantined_instances"]
257         for blocked in rows:
258             logger.debug("blocked='%s' - BEFORE!", blocked)
259             reason = tidyup.reason(rows[blocked]["reason"])
260             blocked = tidyup.domain(blocked)
261             logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
262
263             if blocked not in rows or "reason" not in rows[blocked]:
264                 logger.warning("Cannot find blocked='%s' in rows()=%d,domain='%s' - BREAK!", blocked, len(rows), domain)
265                 break
266             elif blocked == "":
267                 logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
268                 continue
269             elif not domain_helper.is_wanted(blocked):
270                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
271                 continue
272
273             logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
274             blocked = utils.deobfuscate(blocked, domain)
275
276             logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
277             if not domain_helper.is_wanted(blocked):
278                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
279                 continue
280
281             logger.debug("Checking %d blockdict records ...", len(blockdict))
282             for block in blockdict:
283                 logger.debug("block[blocked]='%s',blocked='%s'", block["blocked"], blocked)
284                 if block["blocked"] == blocked:
285                     logger.debug("Updating reason='%s' for blocker='%s'", reason, block["blocked"])
286                     block["reason"] = reason
287     else:
288         logger.warning("Cannot find 'mrf_simple_info' or 'quarantined_instances_info' in JSON reply: domain='%s'", domain)
289
290     if not found:
291         logger.debug("Did not find any useable JSON elements, domain='%s', continuing with /about page ...", domain)
292         blocklist = fetch_blocks_from_about(domain)
293
294         logger.debug("blocklist()=%d", len(blocklist))
295         if len(blocklist) > 0:
296             logger.info("Checking %d different blocklists ...", len(blocklist))
297             for block_level in blocklist:
298                 logger.debug("block_level='%s'", block_level)
299                 rows = blocklist[block_level]
300
301                 logger.debug("rows[%s]()=%d'", type(rows), len(rows))
302                 for block in rows:
303                     logger.debug("Invoking utils.deobfuscate(%s, %s) ...", block["blocked"], domain)
304                     block["blocked"] = utils.deobfuscate(block["blocked"], domain)
305
306                     logger.debug("block[blocked]='%s' - DEOBFUSCATED!", block["blocked"])
307                     if not domain_helper.is_wanted(block["blocked"]):
308                         logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"])
309                         continue
310
311                     logger.debug("Appending blocker='%s',block[blocked]='%s',block[reason]='%s',block_level='%s' ...",domain, block["blocked"], block["reason"], block_level)
312                     blockdict.append({
313                         "blocker"    : domain,
314                         "blocked"    : block["blocked"],
315                         "reason"     : block["reason"],
316                         "block_level": block_level,
317                     })
318
319     logger.debug("blockdict()=%d - EXIT!", len(blockdict))
320     return blockdict
321
322 def fetch_blocks_from_about(domain: str) -> dict:
323     logger.debug("domain='%s' - CALLED!", domain)
324     domain_helper.raise_on(domain)
325
326     logger.debug("Fetching mastodon blocks from domain='%s'", domain)
327     doc = None
328     for path in ["/instance/about/index.html"]:
329         try:
330             # Resetting doc type
331             doc = None
332
333             logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
334             response = network.fetch_response(
335                 domain,
336                 path,
337                 network.web_headers,
338                 (config.get("connection_timeout"), config.get("read_timeout"))
339             )
340
341             logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
342             if not response.ok or response.text.strip() == "":
343                 logger.warning("path='%s' does not exist on domain='%s' - SKIPPED!", path, domain)
344                 continue
345
346             logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
347             doc = bs4.BeautifulSoup(
348                 response.text,
349                 "html.parser",
350             )
351
352             logger.debug("doc[]='%s'", type(doc))
353             if doc.find("h2") is not None:
354                 logger.debug("Found 'h2' header in path='%s' - BREAK!", path)
355                 break
356
357         except network.exceptions as exception:
358             logger.warning("Cannot fetch from domain='%s',exception[%s]='%s'", domain, type(exception), str(exception))
359             instances.set_last_error(domain, exception)
360             break
361
362     blocklist = {
363         "reject"        : [],
364         "filtered_media": [],
365         "followers_only": [],
366         "silenced"      : [],
367         "media_nsfw"    : [],
368         "media_removal" : [],
369         "federated_timeline_removal": [],
370     }
371
372     logger.debug("doc[]='%s'", type(doc))
373     if doc is None:
374         logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
375         return list()
376
377     headers = doc.find_all("h2")
378
379     logger.debug("headers[]='%s'", type(headers))
380     if headers is None:
381         logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
382         return list()
383
384     logger.info("Checking %d headers ...", len(headers))
385     for header in headers:
386         logger.debug("header[%s]='%s'", type(header), header)
387         block_level = tidyup.reason(header.text).lower()
388
389         logger.debug("block_level='%s' - BEFORE!", block_level)
390         if block_level in language_mapping:
391             logger.debug("block_level='%s' - FOUND!", block_level)
392             block_level = language_mapping[block_level].lower()
393         else:
394             logger.warning("block_level='%s' not found in language mapping table", block_level)
395
396         logger.debug("block_level='%s - AFTER!'", block_level)
397         if block_level in blocklist:
398             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
399             logger.debug("Found block_level='%s', importing domain blocks ...", block_level)
400             for line in header.find_next("table").find_all("tr")[1:]:
401                 logger.debug("line[]='%s'", type(line))
402                 blocked = tidyup.domain(line.find_all("td")[0].text)
403                 reason = tidyup.reason(line.find_all("td")[1].text)
404
405                 logger.debug("Appending block_level='%s',blocked='%s',reason='%s' ...", block_level, blocked, reason)
406                 blocklist[block_level].append({
407                     "blocked": blocked,
408                     "reason" : reason,
409                 })
410         else:
411             logger.warning("block_level='%s' not found in blocklist()=%d", block_level, len(blocklist))
412
413     logger.debug("Returning blocklist for domain='%s' - EXIT!", domain)
414     return blocklist