]> git.mxchange.org Git - fba.git/blob - fba/networks/pleroma.py
9d6dab0c6d3ecdd7d68196cb0755c3c690fad49e
[fba.git] / fba / networks / pleroma.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import inspect
18
19 import bs4
20 import validators
21
22 from fba import fba
23
24 from fba.helpers import blacklist
25 from fba.helpers import config
26 from fba.helpers import tidyup
27
28 from fba.http import federation
29 from fba.http import network
30
31 from fba.models import blocks
32 from fba.models import instances
33
34 language_mapping = {
35     # English -> English
36     "Reject": "Suspended servers",
37 }
38
39 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
40     # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
41     if not isinstance(domain, str):
42         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
43     elif domain == "":
44         raise ValueError("Parameter 'domain' is empty")
45     elif not isinstance(origin, str) and origin is not None:
46         raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
47     elif origin == "":
48         raise ValueError("Parameter 'origin' is empty")
49     elif not isinstance(nodeinfo_url, str):
50         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
51     elif nodeinfo_url == "":
52         raise ValueError("Parameter 'nodeinfo_url' is empty")
53
54     # @TODO Unused blockdict
55     blockdict = list()
56     rows = None
57     try:
58         # DEBUG: print(f"DEBUG: Fetching nodeinfo: domain='{domain}',nodeinfo_url='{nodeinfo_url}'")
59         rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
60     except network.exceptions as exception:
61         print(f"WARNING: Exception '{type(exception)}' during fetching nodeinfo")
62         instances.set_last_error(domain, exception)
63
64     if rows is None:
65         print("WARNING: Could not fetch nodeinfo from domain:", domain)
66         return
67     elif "metadata" not in rows:
68         print(f"WARNING: rows()={len(rows)} does not have key 'metadata', domain='{domain}'")
69         return
70     elif "federation" not in rows["metadata"]:
71         print(f"WARNING: rows()={len(rows['metadata'])} does not have key 'federation', domain='{domain}'")
72         return
73
74     data = rows["metadata"]["federation"]
75     found = False
76
77     # DEBUG: print(f"DEBUG: data[]='{type(data)}'")
78     if "mrf_simple" in data:
79         # DEBUG: print("DEBUG: Found mrf_simple:", domain)
80         found = True
81         for block_level, blocklist in (
82             {
83                 **data["mrf_simple"],
84                 **{
85                     "quarantined_instances": data["quarantined_instances"]
86                 }
87             }
88         ).items():
89             # DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
90             block_level = tidyup.domain(block_level)
91             # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
92
93             if block_level == "":
94                 print("WARNING: block_level is now empty!")
95                 continue
96             elif block_level == "accept":
97                 # DEBUG: print(f"DEBUG: domain='{domain}' skipping block_level='accept'")
98                 continue
99
100             # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
101             if len(blocklist) > 0:
102                 for blocked in blocklist:
103                     # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
104                     blocked = tidyup.domain(blocked)
105                     # DEBUG: print("DEBUG: AFTER blocked:", blocked)
106
107                     if blocked == "":
108                         print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
109                         continue
110                     elif blacklist.is_blacklisted(blocked):
111                         # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
112                         continue
113                     elif blocked.count("*") > 0:
114                         # Obscured domain name with no hash
115                         row = instances.deobscure("*", blocked)
116
117                         # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
118                         if row is None:
119                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
120                             continue
121
122                         # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
123                         blocked      = row[0]
124                         origin       = row[1]
125                         nodeinfo_url = row[2]
126                     elif blocked.count("?") > 0:
127                         # Obscured domain name with no hash
128                         row = instances.deobscure("?", blocked)
129
130                         # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
131                         if row is None:
132                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
133                             continue
134
135                         # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
136                         blocked      = row[0]
137                         origin       = row[1]
138                         nodeinfo_url = row[2]
139
140                     # DEBUG: print(f"DEBUG: blocked='{blocked}'")
141                     if not validators.domain(blocked):
142                         print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
143                         continue
144                     elif blocked.endswith(".arpa"):
145                         print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
146                         continue
147                     elif blocked.endswith(".tld"):
148                         print(f"WARNING: blocked='{blocked}' is a fake domain, please don't crawl them!")
149                         continue
150                     elif blacklist.is_blacklisted(blocked):
151                         # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - SKIPPED!")
152                         continue
153                     elif not instances.is_registered(blocked):
154                         # Commit changes
155                         fba.connection.commit()
156
157                         # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
158                         instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
159
160                     if not blocks.is_instance_blocked(domain, blocked, block_level):
161                         # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
162                         blocks.add_instance(domain, blocked, None, block_level)
163
164                         if block_level == "reject":
165                             # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
166                             blockdict.append({
167                                 "blocked": blocked,
168                                 "reason" : None
169                             })
170                     else:
171                         # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
172                         blocks.update_last_seen(domain, blocked, block_level)
173     elif "quarantined_instances" in data:
174         # DEBUG: print(f"DEBUG: Found 'quarantined_instances' in JSON response: domain='{domain}'")
175         found = True
176         block_level = "quarantined"
177
178         for blocked in data["quarantined_instances"]:
179             # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
180             blocked = tidyup.domain(blocked)
181             # DEBUG: print("DEBUG: AFTER blocked:", blocked)
182
183             if blocked == "":
184                 print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
185                 continue
186             elif blacklist.is_blacklisted(blocked):
187                 # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
188                 continue
189             elif blocked.count("*") > 0:
190                 # Obscured domain name with no hash
191                 row = instances.deobscure("*", blocked)
192
193                 # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
194                 if row is None:
195                     print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
196                     continue
197
198                 # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
199                 blocked      = row[0]
200                 origin       = row[1]
201                 nodeinfo_url = row[2]
202             elif blocked.count("?") > 0:
203                 # Obscured domain name with no hash
204                 row = instances.deobscure("?", blocked)
205
206                 # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
207                 if row is None:
208                     print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
209                     continue
210
211                 # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
212                 blocked      = row[0]
213                 origin       = row[1]
214                 nodeinfo_url = row[2]
215
216             # DEBUG: print(f"DEBUG: blocked='{blocked}'")
217             if not validators.domain(blocked):
218                 print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
219                 continue
220             elif blocked.endswith(".arpa"):
221                 print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
222                 continue
223             elif blocked.endswith(".tld"):
224                 print(f"WARNING: blocked='{blocked}' is a fake domain, please don't crawl them!")
225                 continue
226             elif blacklist.is_blacklisted(blocked):
227                 # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - SKIPPED!")
228                 continue
229             elif not instances.is_registered(blocked):
230                 # Commit changes
231                 fba.connection.commit()
232
233                 # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
234                 instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
235
236             if not blocks.is_instance_blocked(domain, blocked, block_level):
237                 # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
238                 blocks.add_instance(domain, blocked, None, block_level)
239
240                 if block_level == "reject":
241                     # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
242                     blockdict.append({
243                         "blocked": blocked,
244                         "reason" : None
245                     })
246             else:
247                 # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
248                 blocks.update_last_seen(domain, blocked, block_level)
249     else:
250         print(f"WARNING: Cannot find 'mrf_simple' or 'quarantined_instances' in JSON reply: domain='{domain}'")
251
252     # DEBUG: print("DEBUG: Committing changes ...")
253     fba.connection.commit()
254
255     # Reasons
256     if "mrf_simple_info" in data:
257         # DEBUG: print("DEBUG: Found mrf_simple_info:", domain)
258         found = True
259         for block_level, info in (
260             {
261                 **data["mrf_simple_info"],
262                 **(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
263             }
264         ).items():
265             # DEBUG: print("DEBUG: block_level, info.items():", block_level, len(info.items()))
266             block_level = tidyup.domain(block_level)
267             # DEBUG: print("DEBUG: BEFORE block_level:", block_level)
268
269             if block_level == "":
270                 print("WARNING: block_level is now empty!")
271                 continue
272             elif block_level == "accept":
273                 # DEBUG: print(f"DEBUG: domain='{domain}' skipping block_level='accept'")
274                 continue
275
276             # DEBUG: print(f"DEBUG: Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
277             for blocked, reason in info.items():
278                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason[{type(reason)}]='{reason}' - BEFORE!")
279                 blocked = tidyup.domain(blocked)
280
281                 if isinstance(reason, str):
282                     # DEBUG: print("DEBUG: reason[] is a string")
283                     reason = tidyup.reason(reason)
284                 elif isinstance(reason, dict) and "reason" in reason:
285                     # DEBUG: print("DEBUG: reason[] is a dict")
286                     reason = tidyup.reason(reason["reason"])
287                 elif reason is not None:
288                     raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
289
290                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
291
292                 if blocked == "":
293                     print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
294                     continue
295                 elif blacklist.is_blacklisted(blocked):
296                     # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
297                     continue
298                 elif blocked.count("*") > 0:
299                     # Obscured domain name with no hash
300                     row = instances.deobscure("*", blocked)
301
302                     # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
303                     if row is None:
304                         print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
305                         continue
306
307                     # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
308                     blocked      = row[0]
309                     origin       = row[1]
310                     nodeinfo_url = row[2]
311                 elif blocked.count("?") > 0:
312                     # Obscured domain name with no hash
313                     row = instances.deobscure("?", blocked)
314
315                     # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
316                     if row is None:
317                         print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
318                         continue
319
320                     # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
321                     blocked      = row[0]
322                     origin       = row[1]
323                     nodeinfo_url = row[2]
324
325                 # DEBUG: print(f"DEBUG: blocked='{blocked}'")
326                 if not validators.domain(blocked):
327                     print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
328                     continue
329                 elif blocked.endswith(".arpa"):
330                     print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
331                     continue
332                 elif blocked.endswith(".tld"):
333                     print(f"WARNING: blocked='{blocked}' is a fake domain, please don't crawl them!")
334                     continue
335                 elif blacklist.is_blacklisted(blocked):
336                     # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - SKIPPED!")
337                     continue
338                 elif not instances.is_registered(blocked):
339                     # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
340                     instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
341
342                 # DEBUG: print(f"DEBUG: Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
343                 blocks.update_reason(reason, domain, blocked, block_level)
344
345                 # DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
346                 for entry in blockdict:
347                     if entry["blocked"] == blocked:
348                         # DEBUG: print(f"DEBUG: Updating entry reason: blocked='{blocked}',reason='{reason}'")
349                         entry["reason"] = reason
350
351     elif "quarantined_instances_info" in data and "quarantined_instances" in data["quarantined_instances_info"]:
352         # DEBUG: print(f"DEBUG: Found 'quarantined_instances_info' in JSON response: domain='{domain}'")
353         found = True
354         block_level = "quarantined"
355
356         #print(data["quarantined_instances_info"])
357         rows = data["quarantined_instances_info"]["quarantined_instances"]
358         for blocked in rows:
359             # DEBUG: print("DEBUG: BEFORE blocked:", blocked)
360             blocked = tidyup.domain(blocked)
361             # DEBUG: print("DEBUG: AFTER blocked:", blocked)
362
363             if blocked not in rows or "reason" not in rows[blocked]:
364                 print(f"WARNING: Cannot find blocked='{blocked}' in rows()={len(rows)},domain='{domain}'")
365                 break
366
367             reason = rows[blocked]["reason"]
368             # DEBUG: print(f"DEBUG: reason='{reason}'")
369
370             if blocked == "":
371                 print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
372                 continue
373             elif blacklist.is_blacklisted(blocked):
374                 # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
375                 continue
376             elif blocked.count("*") > 0:
377                 # Obscured domain name with no hash
378                 row = instances.deobscure("*", blocked)
379
380                 # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
381                 if row is None:
382                     print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
383                     continue
384
385                 # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
386                 blocked      = row[0]
387                 origin       = row[1]
388                 nodeinfo_url = row[2]
389             elif blocked.count("?") > 0:
390                 # Obscured domain name with no hash
391                 row = instances.deobscure("?", blocked)
392
393                 # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
394                 if row is None:
395                     print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
396                     continue
397
398                 # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
399                 blocked      = row[0]
400                 origin       = row[1]
401                 nodeinfo_url = row[2]
402
403             # DEBUG: print(f"DEBUG: blocked='{blocked}'")
404             if not validators.domain(blocked):
405                 print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
406                 continue
407             elif blocked.endswith(".arpa"):
408                 print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
409                 continue
410             elif blocked.endswith(".tld"):
411                 print(f"WARNING: blocked='{blocked}' is a fake domain, please don't crawl them!")
412                 continue
413             elif blacklist.is_blacklisted(blocked):
414                 # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - SKIPPED!")
415                 continue
416             elif not instances.is_registered(blocked):
417                 # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
418                 instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
419
420             # DEBUG: print(f"DEBUG: Updating block reason: reason='{reason}',domain='{domain}',blocked='{blocked}',block_level='{block_level}'")
421             blocks.update_reason(reason, domain, blocked, block_level)
422
423             # DEBUG: print(f"DEBUG: blockdict()={len(blockdict)}")
424             for entry in blockdict:
425                 if entry["blocked"] == blocked:
426                     # DEBUG: print(f"DEBUG: Updating entry reason: blocked='{blocked}',reason='{reason}'")
427                     entry["reason"] = reason
428     else:
429         print(f"WARNING: Cannot find 'mrf_simple_info' or 'quarantined_instances_info' in JSON reply: domain='{domain}'")
430
431     if not found:
432         # DEBUG: print(f"DEBUG: Did not find any useable JSON elements, domain='{domain}', continuing with /about page ...")
433         blocklist = fetch_blocks_from_about(domain)
434
435         # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}")
436         if len(blocklist) > 0:
437             print(f"INFO: Checking {len(blocklist)} record(s) ...")
438             for block_level in blocklist:
439                 # DEBUG: print(f"DEBUG: block_level='{block_level}'")
440                 rows = blocklist[block_level]
441                 # DEBUG: print(f"DEBUG: rows['{type(rows)}]()={len(rows)}'")
442                 for record in rows:
443                     # DEBUG: print(f"DEBUG: record[]='{type(record)}'")
444                     blocked = tidyup.domain(record["blocked"])
445                     reason  = tidyup.reason(record["reason"])
446                     # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
447
448                     if blocked == "":
449                         print("WARNING: blocked is empty after tidyup.domain():", domain, block_level)
450                         continue
451                     elif blacklist.is_blacklisted(blocked):
452                         # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
453                         continue
454                     elif blocked.count("*") > 0:
455                         # Obscured domain name with no hash
456                         row = instances.deobscure("*", blocked)
457
458                         # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
459                         if row is None:
460                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
461                             continue
462
463                         # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
464                         blocked      = row[0]
465                         origin       = row[1]
466                         nodeinfo_url = row[2]
467                     elif blocked.count("?") > 0:
468                         # Obscured domain name with no hash
469                         row = instances.deobscure("?", blocked)
470
471                         # DEBUG: print(f"DEBUG: row[]='{type(row)}'")
472                         if row is None:
473                             print(f"WARNING: Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
474                             continue
475
476                         # DEBUG: print(f"DEBUG: blocked='{blocked}' de-obscured to '{row[0]}'")
477                         blocked      = row[0]
478                         origin       = row[1]
479                         nodeinfo_url = row[2]
480
481                     # DEBUG: print(f"DEBUG: blocked='{blocked}'")
482                     if not validators.domain(blocked):
483                         print(f"WARNING: blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
484                         continue
485                     elif blocked.endswith(".arpa"):
486                         print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
487                         continue
488                     elif blocked.endswith(".tld"):
489                         print(f"WARNING: blocked='{blocked}' is a fake domain, please don't crawl them!")
490                         continue
491                     elif not instances.is_registered(blocked):
492                         # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
493                         instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
494
495                     if not blocks.is_instance_blocked(domain, blocked, block_level):
496                         # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
497                         blocks.add_instance(domain, blocked, reason, block_level)
498
499                         if block_level == "reject":
500                             # DEBUG: print("DEBUG: Adding to blockdict:", blocked)
501                             blockdict.append({
502                                 "blocked": blocked,
503                                 "reason" : reason
504                             })
505                     else:
506                         # DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
507                         blocks.update_reason(reason, domain, blocked, block_level)
508
509     fba.connection.commit()
510     # DEBUG: print("DEBUG: EXIT!")
511
512 def fetch_blocks_from_about(domain: str) -> dict:
513     # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
514     if not isinstance(domain, str):
515         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
516     elif domain == "":
517         raise ValueError("Parameter 'domain' is empty")
518
519     # DEBUG: print(f"DEBUG: Fetching mastodon blocks from domain='{domain}'")
520     doc = None
521     for path in ["/instance/about/index.html"]:
522         try:
523             # Resetting doc type
524             doc = None
525
526             # DEBUG: print(f"DEBUG: Fetching path='{path}' from domain='{domain}' ...")
527             response = network.fetch_response(
528                 domain,
529                 path,
530                 network.web_headers,
531                 (config.get("connection_timeout"), config.get("read_timeout"))
532             )
533
534             # DEBUG: print(f"DEBUG: response.ok='{response.ok}',response.status_code='{response.status_code}',response.text()={len(response.text)}")
535             if not response.ok or response.text.strip() == "":
536                 print(f"WARNING: path='{path}' does not exist on domain='{domain}' - SKIPPED!")
537                 continue
538
539             # DEBUG: print(f"DEBUG: Parsing response.text()={len(response.text)} Bytes ...")
540             doc = bs4.BeautifulSoup(
541                 response.text,
542                 "html.parser",
543             )
544
545             # DEBUG: print(f"DEBUG: doc[]='{type(doc)}'")
546             if doc.find("h2") is not None:
547                 # DEBUG: print(f"DEBUG: Found 'h2' header in path='{path}' - BREAK!")
548                 break
549
550         except network.exceptions as exception:
551             print("ERROR: Cannot fetch from domain:", domain, exception)
552             instances.set_last_error(domain, exception)
553             break
554
555     blocklist = {
556         "Suspended servers": [],
557         "Filtered media"   : [],
558         "Limited servers"  : [],
559         "Silenced servers" : [],
560     }
561
562     # DEBUG: print(f"DEBUG: doc[]='{type(doc)}'")
563     if doc is None:
564         print(f"WARNING: Cannot fetch any /about pages for domain='{domain}' - EXIT!")
565         return blocklist
566
567     for header in doc.find_all("h2"):
568         header_text = tidyup.reason(header.text)
569
570         # DEBUG: print(f"DEBUG: header_text='{header_text}' - BEFORE!")
571         if header_text in language_mapping:
572             # DEBUG: print(f"DEBUG: header_text='{header_text}' - FOUND!")
573             header_text = language_mapping[header_text]
574         else:
575             print(f"WARNING: header_text='{header_text}' not found in language mapping table")
576
577         # DEBUG: print(f"DEBUG: header_text='{header_text} - AFTER!'")
578         if header_text in blocklist or header_text.lower() in blocklist:
579             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
580             # DEBUG: print(f"DEBUG: Found header_text='{header_text}', importing domain blocks ...")
581             for line in header.find_next("table").find_all("tr")[1:]:
582                 # DEBUG: print(f"DEBUG: line[]='{type(line)}'")
583                 blocklist[header_text].append({
584                     "blocked": tidyup.domain(line.find_all("td")[0].text),
585                     "reason" : tidyup.reason(line.find_all("td")[1].text),
586                 })
587         else:
588             print(f"WARNING: header_text='{header_text}' not found in blocklist()={len(blocklist)}")
589
590     # DEBUG: print(f"DEBUG: Returning blocklist for domain='{domain}'")
591     return {
592         "reject"        : blocklist["Suspended servers"],
593         "media_removal" : blocklist["Filtered media"],
594         "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
595     }