]> git.mxchange.org Git - fba.git/blob - fba/networks/mastodon.py
Continued:
[fba.git] / fba / networks / mastodon.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import inspect
18 import logging
19
20 import bs4
21
22 from fba import csrf
23 from fba import database
24 from fba import utils
25
26 from fba.helpers import blacklist
27 from fba.helpers import config
28 from fba.helpers import domain as domain_helper
29 from fba.helpers import tidyup
30
31 from fba.http import network
32
33 from fba.models import blocks
34 from fba.models import instances
35
36 logging.basicConfig(level=logging.INFO)
37 logger = logging.getLogger(__name__)
38
39 # Language mapping X -> English
40 language_mapping = {
41     # English -> English
42     "Silenced instances"            : "Silenced servers",
43     "Suspended instances"           : "Suspended servers",
44     "Limited instances"             : "Limited servers",
45     "Filtered media"                : "Filtered media",
46     # Mappuing German -> English
47     "Gesperrte Server"              : "Suspended servers",
48     "Gefilterte Medien"             : "Filtered media",
49     "Stummgeschaltete Server"       : "Silenced servers",
50     # Japanese -> English
51     "停止済みのサーバー"            : "Suspended servers",
52     "制限中のサーバー"              : "Limited servers",
53     "メディアを拒否しているサーバー": "Filtered media",
54     "サイレンス済みのサーバー"      : "Silenced servers",
55     # ??? -> English
56     "שרתים מושעים"                  : "Suspended servers",
57     "מדיה מסוננת"                   : "Filtered media",
58     "שרתים מוגבלים"                 : "Silenced servers",
59     # French -> English
60     "Serveurs suspendus"            : "Suspended servers",
61     "Médias filtrés"                : "Filtered media",
62     "Serveurs limités"              : "Limited servers",
63     "Serveurs modérés"              : "Limited servers",
64 }
65
66 def fetch_blocks_from_about(domain: str) -> dict:
67     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
68     domain_helper.raise_on(domain)
69
70     logger.debug("Fetching mastodon blocks from domain='%s'", domain)
71     doc = None
72     for path in ["/about/more", "/about"]:
73         try:
74             logger.debug(f"Fetching path='{path}' from domain='{domain}' ...")
75             doc = bs4.BeautifulSoup(
76                 network.fetch_response(
77                     domain,
78                     path,
79                     network.web_headers,
80                     (config.get("connection_timeout"), config.get("read_timeout"))
81                 ).text,
82                 "html.parser",
83             )
84
85             if len(doc.find_all("h3")) > 0:
86                 logger.debug(f"path='{path}' had some headlines - BREAK!")
87                 break
88
89         except network.exceptions as exception:
90             logger.warning("Cannot fetch from domain='%s',exception='%s'", domain, type(exception))
91             instances.set_last_error(domain, exception)
92             break
93
94     blocklist = {
95         "Suspended servers": [],
96         "Filtered media"   : [],
97         "Limited servers"  : [],
98         "Silenced servers" : [],
99     }
100
101     logger.debug("doc[]='%s'", type(doc))
102     if doc is None:
103         logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
104         return list()
105
106     for header in doc.find_all("h3"):
107         header_text = tidyup.reason(header.text)
108
109         logger.debug("header_text='%s'", header_text)
110         if header_text in language_mapping:
111             logger.debug("Translating header_text='%s' ...", header_text)
112             header_text = language_mapping[header_text]
113         else:
114             logger.warning("header_text='%s' not found in language mapping table", header_text)
115
116         if header_text in blocklist or header_text.lower() in blocklist:
117             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
118             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
119                 blocklist[header_text].append({
120                     "domain": tidyup.domain(line.find("span").text),
121                     "hash"  : tidyup.domain(line.find("span")["title"][9:]),
122                     "reason": tidyup.reason(line.find_all("td")[1].text),
123                 })
124         else:
125             logger.warning("header_text='%s' not found in blocklist()=%d", header_text, len(blocklist))
126
127     logger.debug("Returning blocklist for domain='%s' - EXIT!", domain)
128     return {
129         "reject"        : blocklist["Suspended servers"],
130         "media_removal" : blocklist["Filtered media"],
131         "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
132     }
133
134 def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
135     logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
136     domain_helper.raise_on(domain)
137
138     if not isinstance(nodeinfo_url, str):
139         raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not 'str'")
140     elif nodeinfo_url == "":
141         raise ValueError("Parameter 'nodeinfo_url' is empty")
142
143     # Init block list
144     blocklist = list()
145
146     # No CSRF by default, you don't have to add network.api_headers by yourself here
147     headers = tuple()
148
149     try:
150         logger.debug("Checking CSRF for domain='%s'", domain)
151         headers = csrf.determine(domain, dict())
152     except network.exceptions as exception:
153         logger.warning("Exception '%s' during checking CSRF (fetch_blocks,%s) - EXIT!", type(exception), __name__)
154         instances.set_last_error(domain, exception)
155         return blocklist
156
157     try:
158         # json endpoint for newer mastodongs
159         logger.debug("Querying API domain_blocks: domain='%s'", domain)
160         data = network.get_json_api(
161             domain,
162             "/api/v1/instance/domain_blocks",
163             headers,
164             (config.get("connection_timeout"), config.get("read_timeout"))
165         )
166
167         logger.debug("data[]='%s'", type(data))
168         if "error_message" in data:
169             logger.debug("Was not able to fetch domain_blocks from domain='%s': status_code='%d',error_message='%s'", domain, data['status_code'], data['error_message'])
170             instances.set_last_error(domain, data)
171             return blocklist
172         elif "json" in data and "error" in data["json"]:
173             logger.warning("JSON API returned error message: '%s'", data['json']['error'])
174             instances.set_last_error(domain, data)
175             return blocklist
176         else:
177             # Getting blocklist
178             rows = data["json"]
179
180         if len(rows) == 0:
181             logger.debug("domain='%s' has returned zero rows, trying /about/more page ...", domain)
182             rows = fetch_blocks_from_about(domain)
183
184         if len(rows) > 0:
185             logger.info("Checking %d entries from domain='%s' ...", len(rows), domain)
186             for block in rows:
187                 # Check type
188                 logger.debug("block[]='%s'", type(block))
189                 if not isinstance(block, dict):
190                     logger.debug(f"block[]='{type(block)}' is of type 'dict' - SKIPPED!")
191                     continue
192
193                 reason = tidyup.reason(block["comment"]) if "comment" in block and block['comment'] is not None and block['comment'] != "" else None
194
195                 logger.debug("Appending blocker='%s',blocked='%s',reason='%s',block_level='%s'", domain, block['domain'], reason, block['severity'])
196                 blocklist.append({
197                     "blocker"    : domain,
198                     "blocked"    : block["domain"],
199                     "hash"       : block["digest"],
200                     "reason"     : reason,
201                     "block_level": block["severity"]
202                 })
203         else:
204             logger.debug("domain='%s' has no block list")
205
206     except network.exceptions as exception:
207         logger.warning("domain='%s',exception[%s]='%s'", domain, type(exception), str(exception))
208         instances.set_last_error(domain, exception)
209
210     logger.debug("blocklist()=%d - EXIT!", len(blocklist))
211     return blocklist