]> git.mxchange.org Git - fba.git/blob - fba/federation/mastodon.py
Fixed some issues found by pylint:
[fba.git] / fba / federation / mastodon.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import bs4
18 import inspect
19 import validators
20
21 from fba import blacklist
22 from fba import blocks
23 from fba import config
24 from fba import fba
25 from fba import instances
26 from fba import network
27
28 language_mapping = {
29     # English -> English
30     "Silenced instances"            : "Silenced servers",
31     "Suspended instances"           : "Suspended servers",
32     "Limited instances"             : "Limited servers",
33     "Filtered media"                : "Filtered media",
34     # Mappuing German -> English
35     "Gesperrte Server"              : "Suspended servers",
36     "Gefilterte Medien"             : "Filtered media",
37     "Stummgeschaltete Server"       : "Silenced servers",
38     # Japanese -> English
39     "停止済みのサーバー"            : "Suspended servers",
40     "制限中のサーバー"              : "Limited servers",
41     "メディアを拒否しているサーバー": "Filtered media",
42     "サイレンス済みのサーバー"      : "Silenced servers",
43     # ??? -> English
44     "שרתים מושעים"                  : "Suspended servers",
45     "מדיה מסוננת"                   : "Filtered media",
46     "שרתים מוגבלים"                 : "Silenced servers",
47     # French -> English
48     "Serveurs suspendus"            : "Suspended servers",
49     "Médias filtrés"                : "Filtered media",
50     "Serveurs limités"              : "Limited servers",
51     "Serveurs modérés"              : "Limited servers",
52 }
53
54 def fetch_blocks_from_about(domain: str) -> dict:
55     # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
56     if not isinstance(domain, str):
57         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
58     elif domain == "":
59         raise ValueError("Parameter 'domain' is empty")
60
61     # DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
62     blocklist = {
63         "Suspended servers": [],
64         "Filtered media"   : [],
65         "Limited servers"  : [],
66         "Silenced servers" : [],
67     }
68
69     try:
70         doc = bs4.BeautifulSoup(
71             network.fetch_response(domain, "/about/more", fba.headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
72             "html.parser",
73         )
74     except BaseException as exception:
75         print("ERROR: Cannot fetch from domain:", domain, exception)
76         instances.update_last_error(domain, exception)
77         return {}
78
79     for header in doc.find_all("h3"):
80         header_text = fba.tidyup_reason(header.text)
81
82         # DEBUG: print(f"DEBUG: header_text='{header_text}'")
83         if header_text in language_mapping:
84             # DEBUG: print(f"DEBUG: header_text='{header_text}'")
85             header_text = language_mapping[header_text]
86         else:
87             print(f"WARNING: header_text='{header_text}' not found in language mapping table")
88
89         if header_text in blocklist or header_text.lower() in blocklist:
90             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
91             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
92                 blocklist[header_text].append(
93                     {
94                         "domain": fba.tidyup_domain(line.find("span").text),
95                         "hash"  : fba.tidyup_domain(line.find("span")["title"][9:]),
96                         "reason": fba.tidyup_domain(line.find_all("td")[1].text),
97                     }
98                 )
99         else:
100             print(f"WARNING: header_text='{header_text}' not found in blocklist()={len(blocklist)}")
101
102     # DEBUG: print("DEBUG: Returning blocklist for domain:", domain)
103     return {
104         "reject"        : blocklist["Suspended servers"],
105         "media_removal" : blocklist["Filtered media"],
106         "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
107     }
108
109 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
110     # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
111     if not isinstance(domain, str):
112         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
113     elif domain == "":
114         raise ValueError("Parameter 'domain' is empty")
115     elif not isinstance(origin, str) and origin is not None:
116         raise ValueError(f"Parameter origin[]={type(origin)} is not 'str'")
117     elif origin == "":
118         raise ValueError("Parameter 'origin' is empty")
119     elif not isinstance(nodeinfo_url, str):
120         raise ValueError(f"Parameter nodeinfo_url[]={type(nodeinfo_url)} is not 'str'")
121     elif nodeinfo_url == "":
122         raise ValueError("Parameter 'nodeinfo_url' is empty")
123
124     try:
125         # json endpoint for newer mastodongs
126         blockdict = list()
127         try:
128             json = {
129                 "reject"        : [],
130                 "media_removal" : [],
131                 "followers_only": [],
132                 "report_removal": [],
133             }
134
135             # handling CSRF, I've saw at least one server requiring it to access the endpoint
136             # DEBUG: print("DEBUG: Fetching meta:", domain)
137             meta = bs4.BeautifulSoup(
138                 network.fetch_response(domain, "/", fba.headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
139                 "html.parser",
140             )
141             try:
142                 csrf = meta.find("meta", attrs={"name": "csrf-token"})["content"]
143                 # DEBUG: print("DEBUG: Adding CSRF token:", domain, csrf)
144                 reqheaders = {**fba.api_headers, **{"X-CSRF-Token": csrf}}
145             except BaseException as exception:
146                 # DEBUG: print("DEBUG: No CSRF token found, using normal headers:", domain, exception)
147                 reqheaders = fba.api_headers
148
149             # DEBUG: print("DEBUG: Querying API domain_blocks:", domain)
150             blocklist = network.fetch_response(domain, "/api/v1/instance/domain_blocks", reqheaders, (config.get("connection_timeout"), config.get("read_timeout"))).json()
151
152             print(f"INFO: Checking {len(blocklist)} entries from domain='{domain}',software='mastodon' ...")
153             for block in blocklist:
154                 entry = {
155                     'domain': block['domain'],
156                     'hash'  : block['digest'],
157                     'reason': block['comment']
158                 }
159
160                 # DEBUG: print("DEBUG: severity,domain,hash,comment:", block['severity'], block['domain'], block['digest'], block['comment'])
161                 if block['severity'] == 'suspend':
162                     # DEBUG: print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
163                     json['reject'].append(entry)
164                 elif block['severity'] == 'silence':
165                     # DEBUG: print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
166                     json['followers_only'].append(entry)
167                 elif block['severity'] == 'reject_media':
168                     # DEBUG: print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
169                     json['media_removal'].append(entry)
170                 elif block['severity'] == 'reject_reports':
171                     # DEBUG: print(f"DEBUG: Adding entry='{entry}' with severity='{block['severity']}' ...")
172                     json['report_removal'].append(entry)
173                 else:
174                     print("WARNING: Unknown severity:", block['severity'], block['domain'])
175
176         except BaseException as exception:
177             # DEBUG: print(f"DEBUG: Failed, trying mastodon-specific fetches: domain='{domain}',exception[{type(exception)}]={str(exception)}")
178             json = fetch_blocks_from_about(domain)
179
180         print(f"INFO: Checking {len(json.items())} entries from domain='{domain}',software='mastodon' ...")
181         for block_level, blocklist in json.items():
182             # DEBUG: print("DEBUG: domain,block_level,blocklist():", domain, block_level, len(blocklist))
183             block_level = fba.tidyup_domain(block_level)
184
185             # DEBUG: print("DEBUG: AFTER-block_level:", block_level)
186             if block_level == "":
187                 print("WARNING: block_level is empty, domain:", domain)
188                 continue
189
190             # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',software='mastodon',block_level='{block_level}' ...")
191             for block in blocklist:
192                 # DEBUG: print(f"DEBUG: block[]='{type(block)}'")
193                 blocked, blocked_hash, reason = block.values()
194                 # DEBUG: print(f"DEBUG: blocked='{blocked}',blocked_hash='{blocked_hash}',reason='{reason}':")
195                 blocked = fba.tidyup_domain(blocked)
196                 reason  = fba.tidyup_reason(reason) if reason is not None and reason != "" else None
197                 # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
198
199                 if blocked == "":
200                     print("WARNING: blocked is empty:", domain)
201                     continue
202                 elif blacklist.is_blacklisted(blocked):
203                     # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
204                     continue
205                 elif blocked.count("*") > 0:
206                     # Doing the hash search for instance names as well to tidy up DB
207                     fba.cursor.execute(
208                         "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? LIMIT 1", [blocked_hash]
209                     )
210                     searchres = fba.cursor.fetchone()
211
212                     if searchres is None:
213                         print(f"WARNING: Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
214                         continue
215
216                     # DEBUG: print("DEBUG: Updating domain: ", searchres[0])
217                     blocked = searchres[0]
218                     origin = searchres[1]
219                     nodeinfo_url = searchres[2]
220
221                     # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
222                     if not validators.domain(blocked):
223                         print(f"WARNING: blocked='{blocked}',software='mastodon' is not a valid domain name - skipped!")
224                         continue
225                     elif not instances.is_registered(blocked):
226                         # DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
227                         instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
228                 elif not validators.domain(blocked):
229                     print(f"WARNING: blocked='{blocked}',software='mastodon' is not a valid domain name - skipped!")
230                     continue
231
232                 # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
233                 if not validators.domain(blocked):
234                     print(f"WARNING: blocked='{blocked}',software='mastodon' is not a valid domain name - skipped!")
235                     continue
236                 elif not instances.is_registered(blocked):
237                     # DEBUG: print("DEBUG: Hash wasn't found, adding:", blocked, domain)
238                     instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
239
240                 blocking = blocked if blocked.count("*") <= 1 else blocked_hash
241                 # DEBUG: print(f"DEBUG: blocking='{blocking}',blocked='{blocked}',blocked_hash='{blocked_hash}'")
242
243                 if not blocks.is_instance_blocked(domain, blocked, block_level):
244                     # DEBUG: print("DEBUG: Blocking:", domain, blocked, block_level)
245                     blocks.add_instance(domain, blocking, reason, block_level)
246
247                     if block_level == "reject":
248                         blockdict.append({
249                             "blocked": blocked,
250                             "reason" : reason
251                         })
252                 else:
253                     # DEBUG: print(f"DEBUG: Updating block last seen and reason for domain='{domain}',blocking='{blocking}' ...")
254                     blocks.update_last_seen(domain, blocking, block_level)
255                     blocks.update_reason(reason, domain, blocking, block_level)
256
257         # DEBUG: print("DEBUG: Committing changes ...")
258         fba.connection.commit()
259     except Exception as exception:
260         print(f"ERROR: domain='{domain}',software='mastodon',exception[{type(exception)}]:'{str(exception)}'")
261
262     # DEBUG: print("DEBUG: EXIT!")