]> git.mxchange.org Git - fba.git/blob - fba/networks/mastodon.py
Continued:
[fba.git] / fba / networks / mastodon.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import logging
18 import validators
19
20 import bs4
21
22 from fba.helpers import config
23 from fba.helpers import domain as domain_helper
24 from fba.helpers import tidyup
25
26 from fba.http import federation
27 from fba.http import network
28
29 from fba.models import blocks
30 from fba.models import instances
31
32 logging.basicConfig(level=logging.INFO)
33 logger = logging.getLogger(__name__)
34
35 # Language mapping X -> English
36 language_mapping = {
37     # English -> English
38     "Silenced instances"            : "Silenced servers",
39     "Suspended instances"           : "Suspended servers",
40     "Limited instances"             : "Limited servers",
41     "Filtered media"                : "Filtered media",
42     # Mappuing German -> English
43     "Gesperrte Server"              : "Suspended servers",
44     "Gefilterte Medien"             : "Filtered media",
45     "Stummgeschaltete Server"       : "Silenced servers",
46     # Japanese -> English
47     "停止済みのサーバー"            : "Suspended servers",
48     "制限中のサーバー"              : "Limited servers",
49     "メディアを拒否しているサーバー": "Filtered media",
50     "サイレンス済みのサーバー"      : "Silenced servers",
51     # ??? -> English
52     "שרתים מושעים"                  : "Suspended servers",
53     "מדיה מסוננת"                   : "Filtered media",
54     "שרתים מוגבלים"                 : "Silenced servers",
55     # French -> English
56     "Serveurs suspendus"            : "Suspended servers",
57     "Médias filtrés"                : "Filtered media",
58     "Serveurs limités"              : "Limited servers",
59     "Serveurs modérés"              : "Limited servers",
60 }
61
62 def fetch_blocks_from_about(domain: str) -> dict:
63     logger.debug("domain='%s' - CALLED!", domain)
64     domain_helper.raise_on(domain)
65
66     logger.debug("Fetching mastodon blocks from domain='%s'", domain)
67     doc = None
68     for path in ["/about/more", "/about"]:
69         try:
70             logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
71             doc = bs4.BeautifulSoup(
72                 network.fetch_response(
73                     domain,
74                     path,
75                     network.web_headers,
76                     (config.get("connection_timeout"), config.get("read_timeout"))
77                 ).text,
78                 "html.parser",
79             )
80
81             if len(doc.find_all("h3")) > 0:
82                 logger.debug("path='%s' had some headlines - BREAK!", path)
83                 break
84
85         except network.exceptions as exception:
86             logger.warning("Cannot fetch from domain='%s',exception='%s'", domain, type(exception))
87             instances.set_last_error(domain, exception)
88             break
89
90     blocklist = {
91         "Suspended servers": [],
92         "Filtered media"   : [],
93         "Limited servers"  : [],
94         "Silenced servers" : [],
95     }
96
97     logger.debug("doc[]='%s'", type(doc))
98     if doc is None:
99         logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
100         return list()
101
102     for header in doc.find_all("h3"):
103         header_text = tidyup.reason(header.text)
104
105         logger.debug("header_text='%s'", header_text)
106         if header_text in language_mapping:
107             logger.debug("Translating header_text='%s' ...", header_text)
108             header_text = language_mapping[header_text]
109         else:
110             logger.warning("header_text='%s' not found in language mapping table", header_text)
111
112         if header_text in blocklist or header_text.lower() in blocklist:
113             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
114             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
115                 blocklist[header_text].append({
116                     "domain": tidyup.domain(line.find("span").text),
117                     "hash"  : tidyup.domain(line.find("span")["title"][9:]),
118                     "reason": tidyup.reason(line.find_all("td")[1].text),
119                 })
120         else:
121             logger.warning("header_text='%s' not found in blocklist()=%d", header_text, len(blocklist))
122
123     logger.debug("Returning blocklist for domain='%s' - EXIT!", domain)
124     return {
125         "reject"        : blocklist["Suspended servers"],
126         "media_removal" : blocklist["Filtered media"],
127         "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
128     }
129
130 def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
131     logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
132     domain_helper.raise_on(domain)
133
134     if not isinstance(nodeinfo_url, str):
135         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not of type 'str'")
136     elif nodeinfo_url == "":
137         raise ValueError("Parameter 'nodeinfo_url' is empty")
138
139     blocklist = list()
140
141     logger.debug("Invoking federation.fetch_blocks(%s) ...", domain)
142     rows = federation.fetch_blocks(domain)
143
144     logger.debug("rows[%s]()=%d", type(rows), len(rows))
145     if len(rows) == 0:
146         logger.debug("domain='%s' has returned zero rows, trying /about/more page ...", domain)
147         rows = fetch_blocks_from_about(domain)
148
149     logger.debug("rows[%s]()=%d", type(rows), len(rows))
150     if len(rows) > 0:
151         logger.debug("Checking %d entries from domain='%s' ...", len(rows), domain)
152         for block in rows:
153             # Check type
154             logger.debug("block[]='%s'", type(block))
155             if not isinstance(block, dict):
156                 logger.debug("block[]='%s' is of type 'dict' - SKIPPED!", type(block))
157                 continue
158             elif "domain" not in block:
159                 logger.warning("block()=%d does not contain element 'domain' - SKIPPED!", len(block))
160                 continue
161             elif "severity" not in block:
162                 logger.warning("block()=%d does not contain element 'severity' - SKIPPED!", len(block))
163                 continue
164             elif block["severity"] in ["accept", "accepted"]:
165                 logger.debug("block[domain]='%s' has unwanted severity level '%s' - SKIPPED!", block["domain"], block["severity"])
166                 continue
167             elif "digest" in block and not validators.hashes.sha256(block["digest"]):
168                 logger.warning("block[domain]='%s' has invalid block[digest]='%s' - SKIPPED!", block["domain"], block["digest"])
169                 continue
170
171             reason = tidyup.reason(block["comment"]) if "comment" in block and block["comment"] is not None and block["comment"] != "" else None
172
173             logger.debug("Appending blocker='%s',blocked='%s',reason='%s',block_level='%s'", domain, block["domain"], reason, block["severity"])
174             blocklist.append({
175                 "blocker"    : domain,
176                 "blocked"    : block["domain"],
177                 "hash"       : block["digest"] if "digest" in block else None,
178                 "reason"     : reason,
179                 "block_level": blocks.alias_block_level(block["severity"]),
180             })
181     else:
182         logger.debug("domain='%s' has no block list", domain)
183
184     logger.debug("blocklist()=%d - EXIT!", len(blocklist))
185     return blocklist