]> git.mxchange.org Git - fba.git/blob - fba/networks/mastodon.py
Continued:
[fba.git] / fba / networks / mastodon.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import logging
18 import validators
19
20 import bs4
21
22 from fba.helpers import config
23 from fba.helpers import domain as domain_helper
24 from fba.helpers import tidyup
25
26 from fba.http import federation
27 from fba.http import network
28
29 from fba.models import blocks
30 from fba.models import instances
31
32 logging.basicConfig(level=logging.INFO)
33 logger = logging.getLogger(__name__)
34
35 # Language mapping X -> English
36 language_mapping = {
37     # English -> English
38     "Silenced instances"            : "Silenced servers",
39     "Suspended instances"           : "Suspended servers",
40     "Limited instances"             : "Limited servers",
41     "Filtered media"                : "Filtered media",
42     # Mappuing German -> English
43     "Gesperrte Server"              : "Suspended servers",
44     "Gefilterte Medien"             : "Filtered media",
45     "Stummgeschaltete Server"       : "Silenced servers",
46     # Japanese -> English
47     "停止済みのサーバー"            : "Suspended servers",
48     "制限中のサーバー"              : "Limited servers",
49     "メディアを拒否しているサーバー": "Filtered media",
50     "サイレンス済みのサーバー"      : "Silenced servers",
51     # ??? -> English
52     "שרתים מושעים"                  : "Suspended servers",
53     "מדיה מסוננת"                   : "Filtered media",
54     "שרתים מוגבלים"                 : "Silenced servers",
55     # French -> English
56     "Serveurs suspendus"            : "Suspended servers",
57     "Médias filtrés"                : "Filtered media",
58     "Serveurs limités"              : "Limited servers",
59     "Serveurs modérés"              : "Limited servers",
60 }
61
62 def fetch_blocks_from_about(domain: str) -> dict:
63     logger.debug("domain='%s' - CALLED!", domain)
64     domain_helper.raise_on(domain)
65
66     if not instances.is_registered(domain):
67         raise Exception(f"domain='{domain}' is not registered but function is invoked.")
68
69     logger.debug("Fetching mastodon blocks from domain='%s'", domain)
70     doc = None
71     for path in ["/about/more", "/about"]:
72         try:
73             logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
74             doc = bs4.BeautifulSoup(
75                 network.fetch_response(
76                     domain,
77                     path,
78                     network.web_headers,
79                     (config.get("connection_timeout"), config.get("read_timeout"))
80                 ).text,
81                 "html.parser",
82             )
83
84             if len(doc.find_all("h3")) > 0:
85                 logger.debug("path='%s' had some headlines - BREAK!", path)
86                 break
87
88         except network.exceptions as exception:
89             logger.warning("Cannot fetch from domain='%s',exception='%s'", domain, type(exception))
90             instances.set_last_error(domain, exception)
91             break
92
93     blocklist = {
94         "Suspended servers": [],
95         "Filtered media"   : [],
96         "Limited servers"  : [],
97         "Silenced servers" : [],
98     }
99
100     logger.debug("doc[]='%s'", type(doc))
101     if doc is None:
102         logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
103         return list()
104
105     for header in doc.find_all("h3"):
106         header_text = tidyup.reason(header.text)
107
108         logger.debug("header_text='%s'", header_text)
109         if header_text in language_mapping:
110             logger.debug("Translating header_text='%s' ...", header_text)
111             header_text = language_mapping[header_text]
112         else:
113             logger.warning("header_text='%s' not found in language mapping table", header_text)
114
115         if header_text in blocklist or header_text.lower() in blocklist:
116             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
117             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
118                 domain = line.find("span").text
119                 hash   = line.find("span")["title"][9:]
120                 reason = line.find_all("td")[1].text
121
122                 logger.debug("domain='%s',reason='%s' - BEFORE!", domain, reason)
123                 domain = tidyup.domain(domain) if domain != "" else None
124                 reason = tidyup.reason(reason) if reason != "" else None
125
126                 logger.debug("domain='%s',reason='%s' - AFTER!", domain, reason)
127                 if domain is None:
128                     logger.warning("domain is empty,line='%s' - SKIPPED!", line)
129                     continue
130                 elif domain == "":
131                     logger.warning("domain is an empty string,line='%s' - SKIPPED!", line)
132                     continue
133
134                 logger.debug("Appending domain='%s',hash='%s',reason='%s' to blocklist header_text='%s' ...", domain, hash, reason, blocklist)
135                 blocklist[header_text].append({
136                     "domain": domain,
137                     "hash"  : hash,
138                     "reason": reason,
139                 })
140         else:
141             logger.warning("header_text='%s' not found in blocklist()=%d", header_text, len(blocklist))
142
143     logger.debug("Returning blocklist for domain='%s' - EXIT!", domain)
144     return {
145         "reject"        : blocklist["Suspended servers"],
146         "media_removal" : blocklist["Filtered media"],
147         "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
148     }
149
150 def fetch_blocks(domain: str) -> list:
151     logger.debug("domain='%s' - CALLED!", domain)
152     domain_helper.raise_on(domain)
153
154     if not instances.is_registered(domain):
155         raise Exception(f"domain='{domain}' is not registered but function is invoked.")
156
157     blocklist = list()
158
159     logger.debug("Invoking federation.fetch_blocks(%s) ...", domain)
160     rows = federation.fetch_blocks(domain)
161
162     logger.debug("rows[%s]()=%d", type(rows), len(rows))
163     if len(rows) == 0:
164         logger.debug("domain='%s' has returned zero rows, trying /about/more page ...", domain)
165         rows = fetch_blocks_from_about(domain)
166
167     logger.debug("rows[%s]()=%d", type(rows), len(rows))
168     if len(rows) > 0:
169         logger.debug("Checking %d entries from domain='%s' ...", len(rows), domain)
170         for block in rows:
171             # Check type
172             logger.debug("block[]='%s'", type(block))
173             if not isinstance(block, dict):
174                 logger.debug("block[]='%s' is of type 'dict' - SKIPPED!", type(block))
175                 continue
176             elif "domain" not in block:
177                 logger.debug("block='%s'", block)
178                 logger.warning("block()=%d does not contain element 'domain' - SKIPPED!", len(block))
179                 continue
180             elif not domain_helper.is_wanted(block["domain"]):
181                 logger.debug("block[domain]='%s' is not wanted - SKIPPED!", block["domain"])
182                 continue
183             elif "severity" not in block:
184                 logger.warning("block()=%d does not contain element 'severity' - SKIPPED!", len(block))
185                 continue
186             elif block["severity"] in ["accept", "accepted"]:
187                 logger.debug("block[domain]='%s' has unwanted severity level '%s' - SKIPPED!", block["domain"], block["severity"])
188                 continue
189             elif "digest" in block and not validators.hashes.sha256(block["digest"]):
190                 logger.warning("block[domain]='%s' has invalid block[digest]='%s' - SKIPPED!", block["domain"], block["digest"])
191                 continue
192
193             reason = tidyup.reason(block["comment"]) if "comment" in block and block["comment"] is not None and block["comment"] != "" else None
194
195             logger.debug("Appending blocker='%s',blocked='%s',reason='%s',block_level='%s'", domain, block["domain"], reason, block["severity"])
196             blocklist.append({
197                 "blocker"    : domain,
198                 "blocked"    : block["domain"],
199                 "hash"       : block["digest"] if "digest" in block else None,
200                 "reason"     : reason,
201                 "block_level": blocks.alias_block_level(block["severity"]),
202             })
203     else:
204         logger.debug("domain='%s' has no block list", domain)
205
206     logger.debug("blocklist()=%d - EXIT!", len(blocklist))
207     return blocklist