]> git.mxchange.org Git - fba.git/blob - fetch_cs.py
Continued:
[fba.git] / fetch_cs.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
5 # Copyright (C) 2023 Free Software Foundation
6 #
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published
9 # by the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU Affero General Public License for more details.
16 #
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
19
20 import bs4
21 import json
22 import reqto
23 import sys
24 import validators
25 from fba import *
26
27 def find_domains(tag: bs4.element.Tag) -> list:
28     # DEBUG: print(f"DEBUG: tag[]={type(tag)} - CALLED!")
29     if not isinstance(tag, bs4.element.Tag):
30         raise ValueError(f"Parameter tag[]={type(tag)} is not type of bs4.element.Tag")
31     elif not isinstance(tag, bs4.element.Tag):
32         raise KeyError("Cannot find table with instances!")
33     elif len(tag.select("tr")) == 0:
34         raise KeyError("No table rows found in table!")
35
36     domains = list()
37     for element in tag.select("tr"):
38         # DEBUG: print(f"DEBUG: element[]={type(element)}")
39         if not element.find("td"):
40             # DEBUG: print("DEBUG: Skipping element, no <td> found")
41             continue
42
43         domain = fba.tidyup_domain(element.find("td").text)
44         reason = fba.tidyup_reason(element.findAll("td")[1].text)
45
46         # DEBUG: print(f"DEBUG: domain='{domain}',reason='{reason}'")
47
48         if fba.is_blacklisted(domain):
49             print(f"WARNING: domain='{domain}' is blacklisted - skipped!")
50             continue
51         elif domain == "gab.com/.ai, develop.gab.com":
52             print(f"DEBUG: Multiple domains detected in one row")
53             domains.append({
54                 "domain": "gab.com",
55                 "reason": reason,
56             })
57             domains.append({
58                 "domain": "gab.ai",
59                 "reason": reason,
60             })
61             domains.append({
62                 "domain": "develop.gab.com",
63                 "reason": reason,
64             })
65             continue
66         elif not validators.domain(domain):
67             print(f"WARNING: domain='{domain}' is not a valid domain - skipped!")
68             continue
69
70         # DEBUG: print(f"DEBUG: Adding domain='{domain}' ...")
71         domains.append({
72             "domain": domain,
73             "reason": reason,
74         })
75
76     # DEBUG: print(f"DEBUG: domains()={len(domains)} - EXIT!")
77     return domains
78
79 domains = {
80     "silenced": list(),
81     "blocked": list(),
82 }
83
84 try:
85     doc = bs4.BeautifulSoup(
86         reqto.get("https://meta.chaos.social/federation", headers=fba.headers, timeout=(config.get("connection_timeout"), config.get("read_timeout"))).text,
87         "html.parser",
88     )
89     # DEBUG: print(f"DEBUG: doc()={len(doc)}[]={type(doc)}")
90     silenced = doc.find("h2", {"id": "silenced-instances"}).findNext("table")
91
92     # DEBUG: print(f"DEBUG: silenced[]={type(silenced)}")
93     domains["silenced"] = domains["silenced"] + find_domains(silenced)
94     blocked = doc.find("h2", {"id": "blocked-instances"}).findNext("table")
95
96     # DEBUG: print(f"DEBUG: blocked[]={type(blocked)}")
97     domains["blocked"] = domains["blocked"] + find_domains(blocked)
98
99 except BaseException as e:
100     print(f"ERROR: Cannot fetch from meta.chaos.social,exception[{type(e)}]:'{str(e)}'")
101     sys.exit(255)
102
103 # DEBUG: print(f"DEBUG: domains()={len(domains)}")
104 if len(domains) > 0:
105     boot.acquire_lock()
106
107     print(f"INFO: Adding {len(domains)} new instances ...")
108     for block_level in domains:
109         # DEBUG: print(f"DEBUG: block_level='{block_level}'")
110
111         for row in domains[block_level]:
112             # DEBUG: print(f"DEBUG: row='{row}'")
113             if not fba.is_instance_registered(row["domain"]):
114                 print(f"INFO: Fetching instances from domain='{row['domain']}' ...")
115                 fba.fetch_instances(row["domain"], None, None, sys.argv[0])
116
117             if not fba.is_instance_blocked('chaos.social', row["domain"], block_level):
118                 # DEBUG: print(f"DEBUG: domain='{row['domain']}',block_level='{block_level}' blocked by chaos.social, adding ...")
119                 fba.block_instance('chaos.social', row["domain"], row["reason"], block_level)
120
121     # DEBUG: print("DEBUG: Committing changes ...")
122     fba.connection.commit()
123
124 boot.shutdown()