1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
23 from fba import config
26 from fba.helpers import cookies
28 from fba.models import instances
30 # HTTP headers for non-API requests
32 "User-Agent": config.get("useragent"),
35 # HTTP headers for API requests
37 "User-Agent" : config.get("useragent"),
38 "Content-Type": "application/json",
41 # Exceptions to always catch
43 requests.exceptions.ChunkedEncodingError,
44 requests.exceptions.ConnectionError,
45 requests.exceptions.InvalidSchema,
46 requests.exceptions.InvalidURL,
47 requests.exceptions.Timeout,
48 requests.exceptions.TooManyRedirects,
50 urllib3.exceptions.LocationParseError
53 def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) -> dict:
54 # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',data='{data}',headers()={len(headers)} - CALLED!")
55 if not isinstance(domain, str):
56 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
58 raise ValueError("Parameter 'domain' is empty")
59 elif not validators.domain(domain.split("/")[0]):
60 raise ValueError(f"domain='{domain}' is not a valid domain")
61 elif domain.endswith(".arpa"):
62 raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
63 elif domain.endswith(".tld"):
64 raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
65 elif not isinstance(path, str):
66 raise ValueError(f"path[]='{type(path)}' is not 'str'")
68 raise ValueError("Parameter 'path' cannot be empty")
69 elif not isinstance(data, str):
70 raise ValueError(f"data[]='{type(data)}' is not 'str'")
71 elif not isinstance(headers, dict):
72 raise ValueError(f"headers[]='{type(headers)}' is not 'list'")
79 # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',data='{data}',headers({len(headers)})={headers}")
80 response = reqto.post(
81 f"https://{domain}{path}",
83 headers={**api_headers, **headers},
84 timeout=(config.get("connection_timeout"), config.get("read_timeout")),
85 cookies=cookies.get_all(domain) if cookies.has(domain) else {}
88 json_reply["json"] = json_from_response(response)
90 # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
91 if not response.ok or response.status_code >= 400:
92 print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',data()={len(data)},response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
93 json_reply["status_code"] = response.status_code
94 json_reply["error_message"] = response.reason
95 del json_reply["json"]
96 instances.set_last_error(domain, response)
98 except exceptions as exception:
99 # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
100 json_reply["status_code"] = 999
101 json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
102 json_reply["exception"] = exception
103 instances.set_last_error(domain, exception)
106 # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
109 def fetch_api_url(url: str, timeout: tuple) -> dict:
110 # DEBUG: print(f"DEBUG: url='{url}',timeout()={len(timeout)} - CALLED!")
111 if not isinstance(url, str):
112 raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
113 elif not isinstance(timeout, tuple):
114 raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'")
121 # DEBUG: print(f"DEBUG: Fetching url='{url}' ...")
122 response = fba.fetch_url(url, api_headers, timeout)
124 json_reply["json"] = json_from_response(response)
126 # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
127 if not response.ok or response.status_code >= 400:
128 print(f"WARNING: Cannot query JSON API: url='{url}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
129 json_reply["status_code"] = response.status_code
130 json_reply["error_message"] = response.reason
131 del json_reply["json"]
133 except exceptions as exception:
134 # DEBUG: print(f"DEBUG: Fetching '{url}' failed. exception[{type(exception)}]='{str(exception)}'")
135 json_reply["status_code"] = 999
136 json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
137 json_reply["exception"] = exception
140 # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
143 def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict:
144 # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',timeout()={len(timeout)} - CALLED!")
145 if not isinstance(domain, str):
146 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
148 raise ValueError("Parameter 'domain' is empty")
149 elif not validators.domain(domain.split("/")[0]):
150 raise ValueError(f"domain='{domain}' is not a valid domain")
151 elif domain.endswith(".arpa"):
152 raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
153 elif domain.endswith(".tld"):
154 raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
155 elif not isinstance(path, str):
156 raise ValueError(f"path[]='{type(path)}' is not 'str'")
158 raise ValueError("Parameter 'path' cannot be empty")
159 elif not isinstance(headers, dict):
160 raise ValueError(f"headers[]='{type(headers)}' is not 'list'")
161 elif not isinstance(timeout, tuple):
162 raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'")
169 # DEBUG: print(f"DEBUG: Sending GET to domain='{domain}',path='{path}',timeout({len(timeout)})={timeout}")
170 response = reqto.get(
171 f"https://{domain}{path}",
172 headers={**api_headers, **headers},
174 cookies=cookies.get_all(domain) if cookies.has(domain) else {}
177 except exceptions as exception:
178 # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
179 json_reply["status_code"] = 999
180 json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
181 json_reply["exception"] = exception
182 instances.set_last_error(domain, exception)
185 json_reply["json"] = json_from_response(response)
187 # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
188 if not response.ok or response.status_code >= 400:
189 print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
190 json_reply["status_code"] = response.status_code
191 json_reply["error_message"] = response.reason
192 del json_reply["json"]
193 instances.set_last_error(domain, response)
195 # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
198 def send_bot_post(domain: str, blocklist: dict):
199 # DEBUG: print(f"DEBUG: domain='{domain}',blocklist()={len(blocklist)} - CALLED!")
200 if not isinstance(domain, str):
201 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
203 raise ValueError("Parameter 'domain' is empty")
204 elif domain.endswith(".tld"):
205 raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
206 elif not isinstance(blocklist, dict):
207 raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'")
209 message = f"{domain} has blocked the following instances:\n\n"
212 if len(blocklist) > 20:
214 blocklist = blocklist[0 : 19]
216 # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}")
217 for block in blocklist:
218 # DEBUG: print(f"DEBUG: block['{type(block)}']={block}")
219 if block["reason"] is None or block["reason"] == '':
220 message = message + block["blocked"] + " with unspecified reason\n"
222 if len(block["reason"]) > 420:
223 block["reason"] = block["reason"][0:419] + "[…]"
225 message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
228 message = message + "(the list has been truncated to the first 20 entries)"
230 botheaders = {**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}}
233 f"{config.get('bot_instance')}/api/v1/statuses",
236 "visibility" : config.get('bot_visibility'),
237 "content_type": "text/plain"
245 def fetch_response(domain: str, path: str, headers: dict, timeout: tuple) -> requests.models.Response:
246 # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
247 if not isinstance(domain, str):
248 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
250 raise ValueError("Parameter 'domain' is empty")
251 elif not validators.domain(domain.split("/")[0]):
252 raise ValueError(f"domain='{domain}' is not a valid domain")
253 elif domain.endswith(".arpa"):
254 raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
255 elif domain.endswith(".tld"):
256 raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
257 elif not isinstance(path, str):
258 raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
260 raise ValueError("Parameter 'path' is empty")
261 elif not isinstance(headers, dict):
262 raise ValueError(f"headers[]='{type(headers)}' is not 'dict'")
263 elif not isinstance(timeout, tuple):
264 raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'")
267 # DEBUG: print(f"DEBUG: Sending GET request to '{domain}{path}' ...")
268 response = reqto.get(
269 f"https://{domain}{path}",
272 cookies=cookies.get_all(domain) if cookies.has(domain) else {}
275 except exceptions as exception:
276 # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
277 instances.set_last_error(domain, exception)
280 # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
283 def json_from_response(response: requests.models.Response) -> list:
284 # DEBUG: print(f"DEBUG: response[]='{type(response)}' - CALLED!")
285 if not isinstance(response, requests.models.Response):
286 raise ValueError(f"Parameter response[]='{type(response)}' is not type of 'Response'")
289 if response.text.strip() != "":
290 # DEBUG: print(f"DEBUG: response.text()={len(response.text)} is not empty, invoking response.json() ...")
292 data = response.json()
293 except json.decoder.JSONDecodeError:
296 # DEBUG: print(f"DEBUG: data[]='{type(data)}' - EXIT!")