]> git.mxchange.org Git - fba.git/blob - fba/networks/pleroma.py
Continued:
[fba.git] / fba / networks / pleroma.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import logging
18
19 import bs4
20
21 from fba import database
22 from fba import utils
23
24 from fba.helpers import config
25 from fba.helpers import domain as domain_helper
26 from fba.helpers import tidyup
27
28 from fba.http import federation
29 from fba.http import network
30
31 from fba.models import blocks
32 from fba.models import instances
33
34 logging.basicConfig(level=logging.INFO)
35 logger = logging.getLogger(__name__)
36
37 # Language mapping X -> English
38 language_mapping = {
39     # English -> English
40     "filtered media"   : "filtered_media",
41     "limited servers"  : "followers_only",
42     "followers-only"   : "followers_only",
43     "media removal"    : "media_removal",
44     "media_removal"    : "media_removal",
45     "media force-set as sensitive": "media_nsfw",
46     "nsfw"             : "media_nsfw",
47     "reject"           : "reject",
48     "suspended servers": "reject",
49     "silenced servers" : "silenced",
50     "removal from \"the whole known network\" timeline": "federated_timeline_removal",
51 }
52
53 def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
54     logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
55     domain_helper.raise_on(domain)
56
57     if not isinstance(nodeinfo_url, str):
58         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not of type 'str'")
59     elif nodeinfo_url == "":
60         raise ValueError("Parameter 'nodeinfo_url' is empty")
61
62     blockdict = list()
63     rows = None
64     try:
65         logger.debug("Fetching nodeinfo: domain='%s',nodeinfo_url='%s'", domain, nodeinfo_url)
66         rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
67
68         if "error_message" in rows:
69             logger.warning("Error message '%s' during fetching nodeinfo for domain='%s',nodeinfo_url='%s' - EXIT!", rows["error_message"], domain, nodeinfo_url)
70             instances.set_last_error(domain, rows)
71             return list()
72         elif "exception" in rows:
73             logger.warning("Exception '%s' during fetching nodeinfo for domain='%s',nodeinfo_url='%s' - EXIT!", type(rows["exception"]), domain, nodeinfo_url)
74             return list()
75         elif "json" in rows:
76             logger.debug("rows[json] found for domain='%s',nodeinfo_url='%s'", domain, nodeinfo_url)
77             rows = rows["json"]
78     except network.exceptions as exception:
79         logger.warning("Exception '%s' during fetching nodeinfo from domain='%s'", type(exception), domain)
80         instances.set_last_error(domain, exception)
81
82     if rows is None:
83         logger.warning("Could not fetch nodeinfo from domain='%s'", domain)
84         return list()
85     elif "metadata" not in rows:
86         logger.warning("rows()=%d does not have key 'metadata', domain='%s'", len(rows), domain)
87         return list()
88     elif "federation" not in rows["metadata"]:
89         logger.warning("rows()=%d does not have key 'federation', domain='%s'", len(rows["metadata"]), domain)
90         return list()
91
92     data = rows["metadata"]["federation"]
93     found = False
94
95     logger.debug("data[]='%s'", type(data))
96     if "mrf_simple" in data:
97         logger.debug("Found mrf_simple in API response from domain='%s'", domain)
98         found = True
99         for block_level, blocklist in (
100             {
101                 **data["mrf_simple"],
102                 **{
103                     "quarantined_instances": data["quarantined_instances"]
104                 }
105             }
106         ).items():
107             logger.debug("block_level='%s', blocklist()=%d", block_level, len(blocklist))
108             block_level = tidyup.domain(block_level)
109             logger.debug("block_level='%s' - AFTER!", block_level)
110
111             if block_level == "":
112                 logger.warning("block_level is now empty!")
113                 continue
114             elif block_level == "accept":
115                 logger.debug("domain='%s' skipping block_level='accept'", domain)
116                 continue
117
118             block_level = blocks.alias_block_level(block_level)
119
120             logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(blocklist), domain, block_level)
121             if len(blocklist) > 0:
122                 for blocked in blocklist:
123                     logger.debug("blocked='%s' - BEFORE!", blocked)
124                     blocked = tidyup.domain(blocked)
125                     logger.debug("blocked='%s' - AFTER!", blocked)
126
127                     if blocked == "":
128                         logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s' - SKIPPED!", domain, block_level)
129                         continue
130                     elif not utils.is_domain_wanted(blocked):
131                         logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
132                         continue
133
134                     logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
135                     blocked = utils.deobfuscate(blocked, domain)
136
137                     logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
138                     if not utils.is_domain_wanted(blocked):
139                         logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
140                         continue
141
142                     logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
143                     blockdict.append({
144                         "blocker"    : domain,
145                         "blocked"    : blocked,
146                         "reason"     : None,
147                         "block_level": block_level,
148                     })
149
150     elif "quarantined_instances" in data:
151         logger.debug("Found 'quarantined_instances' in JSON response: domain='%s'", domain)
152         found = True
153         block_level = "quarantined"
154
155         for blocked in data["quarantined_instances"]:
156             logger.debug("blocked='%s' - BEFORE!", blocked)
157             blocked = tidyup.domain(blocked)
158             logger.debug("blocked='%s' - AFTER!", blocked)
159
160             if blocked == "":
161                 logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
162                 continue
163             elif not utils.is_domain_wanted(blocked):
164                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
165                 continue
166
167             logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
168             blocked = utils.deobfuscate(blocked, domain)
169
170             logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
171             if not utils.is_domain_wanted(blocked):
172                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
173                 continue
174
175             logger.debug("Appending blocker='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
176             blockdict.append({
177                 "blocker"    : domain,
178                 "blocked"    : blocked,
179                 "reason"     : None,
180                 "block_level": block_level,
181             })
182
183     else:
184         logger.warning("Cannot find 'mrf_simple' or 'quarantined_instances' in JSON reply: domain='%s'", domain)
185
186     logger.debug("Invoking commit() ...")
187     database.connection.commit()
188
189     # Reasons
190     if "mrf_simple_info" in data:
191         logger.debug("Found mrf_simple_info in API response: domain='%s'", domain)
192         found = True
193         for block_level, info in (
194             {
195                 **data["mrf_simple_info"],
196                 **(data["quarantined_instances_info"] if "quarantined_instances_info" in data else {})
197             }
198         ).items():
199             logger.debug("block_level='%s', info.items()=%d", block_level, len(info.items()))
200             block_level = tidyup.domain(block_level)
201             logger.debug("block_level='%s' - AFTER!", block_level)
202
203             if block_level == "":
204                 logger.warning("block_level is now empty!")
205                 continue
206             elif block_level == "accept":
207                 logger.debug("domain='%s': Skipping block_level='%s' ...", domain, block_level)
208                 continue
209
210             block_level = blocks.alias_block_level(block_level)
211
212             logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(info.items()), domain, block_level)
213             for blocked, reason in info.items():
214                 logger.debug("blocked='%s',reason[%s]='%s' - BEFORE!", blocked, type(reason), reason)
215                 blocked = tidyup.domain(blocked)
216                 logger.debug("blocked='%s' - AFTER!", blocked)
217
218                 if isinstance(reason, str):
219                     logger.debug("reason[] is a string")
220                     reason = tidyup.reason(reason)
221                 elif isinstance(reason, dict) and "reason" in reason:
222                     logger.debug("reason[] is a dict")
223                     reason = tidyup.reason(reason["reason"]) if isinstance(reason["reason"], str) else None
224                 elif reason is not None:
225                     raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
226
227                 logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
228
229                 if blocked == "":
230                     logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
231                     continue
232                 elif not utils.is_domain_wanted(blocked):
233                     logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
234                     continue
235
236                 logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
237                 blocked = utils.deobfuscate(blocked, domain)
238                 logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
239
240                 logger.debug("Checking %d blockdict records ...", len(blockdict))
241                 for block in blockdict:
242                     logger.debug("block[blocked]='%s',blocked='%s'", block["blocked"], blocked)
243                     if block["blocked"] == blocked:
244                         logger.debug("Updating reason='%s' for blocker='%s'", reason, block["blocked"])
245                         block["reason"] = reason
246
247     elif "quarantined_instances_info" in data and "quarantined_instances" in data["quarantined_instances_info"]:
248         logger.debug("Found 'quarantined_instances_info' in JSON response: domain='%s'", domain)
249         found = True
250         block_level = "quarantined"
251
252         #print(data["quarantined_instances_info"])
253         rows = data["quarantined_instances_info"]["quarantined_instances"]
254         for blocked in rows:
255             logger.debug("blocked='%s' - BEFORE!", blocked)
256             reason = tidyup.reason(rows[blocked]["reason"])
257             blocked = tidyup.domain(blocked)
258             logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
259
260             if blocked not in rows or "reason" not in rows[blocked]:
261                 logger.warning("Cannot find blocked='%s' in rows()=%d,domain='%s' - BREAK!", blocked, len(rows), domain)
262                 break
263             elif blocked == "":
264                 logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
265                 continue
266             elif not utils.is_domain_wanted(blocked):
267                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
268                 continue
269
270             logger.debug("Invoking utils.deobfuscate(%s, %s) ...", blocked, domain)
271             blocked = utils.deobfuscate(blocked, domain)
272
273             logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
274             if not utils.is_domain_wanted(blocked):
275                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
276                 continue
277
278             logger.debug("Checking %d blockdict records ...", len(blockdict))
279             for block in blockdict:
280                 logger.debug("block[blocked]='%s',blocked='%s'", block["blocked"], blocked)
281                 if block["blocked"] == blocked:
282                     logger.debug("Updating reason='%s' for blocker='%s'", reason, block["blocked"])
283                     block["reason"] = reason
284     else:
285         logger.warning("Cannot find 'mrf_simple_info' or 'quarantined_instances_info' in JSON reply: domain='%s'", domain)
286
287     if not found:
288         logger.debug("Did not find any useable JSON elements, domain='%s', continuing with /about page ...", domain)
289         blocklist = fetch_blocks_from_about(domain)
290
291         logger.debug("blocklist()=%d", len(blocklist))
292         if len(blocklist) > 0:
293             logger.info("Checking %d different blocklists ...", len(blocklist))
294             for block_level in blocklist:
295                 logger.debug("block_level='%s'", block_level)
296                 rows = blocklist[block_level]
297
298                 logger.debug("rows[%s]()=%d'", type(rows), len(rows))
299                 for block in rows:
300                     logger.debug("Invoking utils.deobfuscate(%s, %s) ...", block["blocked"], domain)
301                     block["blocked"] = utils.deobfuscate(block["blocked"], domain)
302
303                     logger.debug("block[blocked]='%s' - DEOBFUSCATED!", block["blocked"])
304                     if not utils.is_domain_wanted(block["blocked"]):
305                         logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"])
306                         continue
307
308                     logger.debug("Appending blocker='%s',block[blocked]='%s',block[reason]='%s',block_level='%s' ...",domain, block["blocked"], block["reason"], block_level)
309                     blockdict.append({
310                         "blocker"    : domain,
311                         "blocked"    : block["blocked"],
312                         "reason"     : block["reason"],
313                         "block_level": block_level,
314                     })
315
316     logger.debug("blockdict()=%d - EXIT!", len(blockdict))
317     return blockdict
318
319 def fetch_blocks_from_about(domain: str) -> dict:
320     logger.debug("domain='%s' - CALLED!", domain)
321     domain_helper.raise_on(domain)
322
323     logger.debug("Fetching mastodon blocks from domain='%s'", domain)
324     doc = None
325     for path in ["/instance/about/index.html"]:
326         try:
327             # Resetting doc type
328             doc = None
329
330             logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
331             response = network.fetch_response(
332                 domain,
333                 path,
334                 network.web_headers,
335                 (config.get("connection_timeout"), config.get("read_timeout"))
336             )
337
338             logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
339             if not response.ok or response.text.strip() == "":
340                 logger.warning("path='%s' does not exist on domain='%s' - SKIPPED!", path, domain)
341                 continue
342
343             logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
344             doc = bs4.BeautifulSoup(
345                 response.text,
346                 "html.parser",
347             )
348
349             logger.debug("doc[]='%s'", type(doc))
350             if doc.find("h2") is not None:
351                 logger.debug("Found 'h2' header in path='%s' - BREAK!", path)
352                 break
353
354         except network.exceptions as exception:
355             logger.warning("Cannot fetch from domain='%s',exception[%s]='%s'", domain, type(exception), str(exception))
356             instances.set_last_error(domain, exception)
357             break
358
359     blocklist = {
360         "reject"        : [],
361         "filtered_media": [],
362         "followers_only": [],
363         "silenced"      : [],
364         "media_nsfw"    : [],
365         "media_removal" : [],
366         "federated_timeline_removal": [],
367     }
368
369     logger.debug("doc[]='%s'", type(doc))
370     if doc is None:
371         logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
372         return list()
373
374     headers = doc.find_all("h2")
375
376     logger.debug("headers[]='%s'", type(headers))
377     if headers is None:
378         logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
379         return list()
380
381     logger.info("Checking %d headers ...", len(headers))
382     for header in headers:
383         logger.debug("header[%s]='%s'", type(header), header)
384         block_level = tidyup.reason(header.text).lower()
385
386         logger.debug("block_level='%s' - BEFORE!", block_level)
387         if block_level in language_mapping:
388             logger.debug("block_level='%s' - FOUND!", block_level)
389             block_level = language_mapping[block_level].lower()
390         else:
391             logger.warning("block_level='%s' not found in language mapping table", block_level)
392
393         logger.debug("block_level='%s - AFTER!'", block_level)
394         if block_level in blocklist:
395             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
396             logger.debug("Found block_level='%s', importing domain blocks ...", block_level)
397             for line in header.find_next("table").find_all("tr")[1:]:
398                 logger.debug("line[]='%s'", type(line))
399                 blocked = tidyup.domain(line.find_all("td")[0].text)
400                 reason = tidyup.reason(line.find_all("td")[1].text)
401
402                 logger.debug("Appending block_level='%s',blocked='%s',reason='%s' ...", block_level, blocked, reason)
403                 blocklist[block_level].append({
404                     "blocked": blocked,
405                     "reason" : reason,
406                 })
407         else:
408             logger.warning("block_level='%s' not found in blocklist()=%d", block_level, len(blocklist))
409
410     logger.debug("Returning blocklist for domain='%s' - EXIT!", domain)
411     return blocklist