1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
26 from fba.helpers import config
27 from fba.helpers import cookies
28 from fba.helpers import domain as domain_helper
29 from fba.helpers import json as json_helper
31 from fba.models import instances
33 logging.basicConfig(level=logging.INFO)
34 logger = logging.getLogger(__name__)
36 # HTTP headers for non-API requests
38 "User-Agent": config.get("useragent"),
41 # HTTP headers for API requests
43 "User-Agent" : config.get("useragent"),
44 "Content-Type": "application/json; charset=utf-8",
47 # Exceptions to always catch
49 requests.exceptions.ChunkedEncodingError,
50 requests.exceptions.ConnectionError,
51 requests.exceptions.InvalidSchema,
52 requests.exceptions.InvalidURL,
53 requests.exceptions.Timeout,
54 requests.exceptions.TooManyRedirects,
56 urllib3.exceptions.LocationParseError
59 def post_json_api(domain: str, path: str, data: str = "", headers: dict = dict()) -> dict:
60 logger.debug("domain='%s',path='%s',data='%s',headers()=%d - CALLED!", domain, path, data, len(headers))
61 domain_helper.raise_on(domain)
63 if not isinstance(path, str):
64 raise ValueError(f"path[]='{type(path)}' is not of type 'str'")
66 raise ValueError("Parameter 'path' is empty")
67 elif not isinstance(data, str):
68 raise ValueError(f"data[]='{type(data)}' is not of type 'str'")
69 elif not isinstance(headers, dict):
70 raise ValueError(f"headers[]='{type(headers)}' is not of type 'list'")
77 logger.debug("Sending POST to domain='%s',path='%s',data='%s',headers(%d)='%s'", domain, path, data, len(headers), headers)
78 response = reqto.post(
79 f"https://{domain}{path}",
81 headers={**api_headers, **headers},
82 timeout=(config.get("connection_timeout"), config.get("read_timeout")),
83 cookies=cookies.get_all(domain),
87 logger.debug("Parsing JSON response from domain='%s',path='%s' ...", domain, path)
88 json_reply["json"] = json_helper.from_response(response)
90 logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
91 if not response.ok or response.status_code >= 300 or len(response.text.strip()) == 0:
92 logger.debug("Cannot query JSON API: domain='%s',path='%s',data()=%d,response.status_code=%d,response.text()=%d", domain, path, len(data), response.status_code, len(response.text))
93 json_reply["status_code"] = response.status_code
94 json_reply["error_message"] = response.reason
95 instances.set_last_error(domain, response)
96 del json_reply["json"]
98 except exceptions as exception:
99 logger.debug("Fetching path='%s' from domain='%s' failed. exception[%s]='%s'", path, domain, type(exception), str(exception))
100 json_reply["status_code"] = 999
101 json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
102 json_reply["exception"] = exception
103 instances.set_last_error(domain, exception)
106 logger.debug("Returning json_reply(%d)[]='%s' - EXIT!", len(json_reply), type(json_reply))
109 def fetch_api_url(url: str, timeout: tuple) -> dict:
110 logger.debug("url='%s',timeout()=%d - CALLED!", url, len(timeout))
111 if not isinstance(url, str):
112 raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
114 raise ValueError("Parameter 'url' is empty")
115 elif not isinstance(timeout, tuple):
116 raise ValueError(f"timeout[]='{type(timeout)}' is not of type 'tuple'")
123 logger.debug("Fetching url='%s' ...", url)
124 response = utils.fetch_url(url, api_headers, timeout)
126 logger.debug("Parsing JSON response from url='%s' ...", url)
127 json_reply["json"] = json_helper.from_response(response)
129 logger.debug("response.ok='%s',response.status_code='%s',response.text()=%d", response.ok, response.status_code, len(response.text))
130 if not response.ok or response.status_code >= 300 or len(response.text) == 0:
131 logger.warning("Cannot query JSON API: url='%s',response.status_code=%d,response.text()=%d", url, response.status_code, len(response.text))
132 json_reply["status_code"] = response.status_code
133 json_reply["error_message"] = response.reason
134 del json_reply["json"]
136 except exceptions as exception:
137 logger.debug("Fetching url='%s' failed. exception[%s]='%s'", url, type(exception), str(exception))
138 json_reply["status_code"] = 999
139 json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
140 json_reply["exception"] = exception
143 logger.debug("Returning json_reply(%d)[]='%s' - EXIT!", len(json_reply), type(json_reply))
146 def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict:
147 logger.debug("domain='%s',path='%s',timeout()=%d - CALLED!", domain, path, len(timeout))
148 domain_helper.raise_on(domain)
150 if not isinstance(path, str):
151 raise ValueError(f"path[]='{type(path)}' is not of type 'str'")
153 raise ValueError("Parameter 'path' is empty")
154 elif not isinstance(headers, dict):
155 raise ValueError(f"headers[]='{type(headers)}' is not of type 'list'")
156 elif not isinstance(timeout, tuple):
157 raise ValueError(f"timeout[]='{type(timeout)}' is not of type 'tuple'")
164 logger.debug("Sending GET to domain='%s',path='%s',timeout(%d)='%s'", domain, path, len(timeout), timeout)
165 response = fetch_response(domain, path, headers, timeout)
166 except exceptions as exception:
167 logger.debug("Fetching path='%s' from domain='%s' failed. exception[%s]='%s'", path, domain, type(exception), str(exception))
168 json_reply["status_code"] = 999
169 json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
170 json_reply["exception"] = exception
171 instances.set_last_error(domain, exception)
174 logger.debug("Parsing JSON response from domain='%s',path='%s' ...", domain, path)
175 json_reply["json"] = json_helper.from_response(response)
177 logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
178 if not response.ok or response.status_code >= 300 or len(response.text) == 0:
179 logger.debug("Cannot query JSON API: domain='%s',path='%s',response.status_code=%d,response.text()=%d", domain, path, response.status_code, len(response.text))
180 json_reply["status_code"] = response.status_code
181 json_reply["error_message"] = response.reason
182 instances.set_last_error(domain, response)
183 del json_reply["json"]
185 logger.debug("Returning json_reply(%d)[]='%s' - EXIT!", len(json_reply), type(json_reply))
188 def send_bot_post(domain: str, blocklist: list):
189 logger.debug("domain='%s',blocklist()=%d - CALLED!", domain, len(blocklist))
190 domain_helper.raise_on(domain)
192 if not isinstance(blocklist, list):
193 raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not of type 'list'")
194 elif len(blocklist) == 0:
195 raise ValueError("Parameter 'blocklist' is empty")
197 message = f"{domain} has blocked the following instances:\n\n"
200 if len(blocklist) > 20:
202 blocklist = blocklist[0 : 19]
204 logger.debug("blocklist()=%d", len(blocklist))
205 for block in blocklist:
206 logger.debug("block[%s]='%s'", type(block), block)
207 if block["reason"] is None or block["reason"] == '':
208 logger.debug("block[blocked]='%s' is being blocked with no reason specified", block["blocked"])
209 message = message + block["blocked"] + " with unspecified reason\n"
211 logger.debug("block[reason]()=%d", len(block["reason"]))
212 if len(block["reason"]) > 420:
213 block["reason"] = block["reason"][0:419] + "[…]"
215 message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
218 message = message + "(the list has been truncated to the first 20 entries)"
220 response = reqto.post(
221 f"{config.get('bot_instance')}/api/v1/statuses",
224 "visibility" : config.get("bot_visibility"),
225 "content_type": "text/plain"
227 headers={**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}},
228 timeout=(config.get("connection_timeout"), config.get("read_timeout")),
229 allow_redirects=False
232 logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
233 return response.ok and response.status_code < 300 and response.text.strip() != ""
235 def fetch_response(domain: str, path: str, headers: dict, timeout: tuple, allow_redirects: bool = False) -> requests.models.Response:
236 logger.debug("domain='%s',path='%s',headers()=%d,timeout='%s',allow_redirects='%s' - CALLED!", domain, path, len(headers), timeout, allow_redirects)
237 domain_helper.raise_on(domain)
239 if not isinstance(path, str):
240 raise ValueError(f"Parameter path[]='{type(path)}' is not of type 'str'")
242 raise ValueError("Parameter 'path' is empty")
243 elif not isinstance(headers, dict):
244 raise ValueError(f"headers[]='{type(headers)}' is not of type 'dict'")
245 elif not isinstance(timeout, tuple):
246 raise ValueError(f"timeout[]='{type(timeout)}' is not of type 'tuple'")
249 logger.debug("Sending GET request to '%s%s' ...", domain, path)
250 response = reqto.get(
251 f"https://{domain}{path}",
254 cookies=cookies.get_all(domain),
255 allow_redirects=allow_redirects
258 except exceptions as exception:
259 logger.debug("Fetching path='%s' from domain='%s' failed. exception[%s]='%s'", path, domain, type(exception), str(exception))
260 instances.set_last_error(domain, exception)
263 logger.debug("response[]='%s' - EXIT!", type(response))