1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 from fba import config
24 from fba import instances
26 # HTTP headers for non-API requests
28 "User-Agent": config.get("useragent"),
31 # HTTP headers for API requests
33 "User-Agent" : config.get("useragent"),
34 "Content-Type": "application/json",
37 def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
38 # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',parameter='{parameter}',extra_headers()={len(extra_headers)} - CALLED!")
39 if not isinstance(domain, str):
40 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
42 raise ValueError("Parameter 'domain' is empty")
43 elif not isinstance(path, str):
44 raise ValueError(f"path[]={type(path)} is not 'str'")
46 raise ValueError("Parameter 'path' cannot be empty")
47 elif not isinstance(parameter, str):
48 raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
50 # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}' ...")
51 headers = csrf.determine(domain, {**api_headers, **extra_headers})
56 # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',parameter='{parameter}',extra_headers({len(extra_headers)})={extra_headers}")
57 response = reqto.post(
58 f"https://{domain}{path}",
61 timeout=(config.get("connection_timeout"), config.get("read_timeout"))
64 data = fba.json_from_response(response)
65 # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
66 if not response.ok or response.status_code >= 400:
67 print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
68 instances.update_last_error(domain, response)
70 except BaseException as exc:
71 print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exc[{type(exc)}]:'{str(exc)}'")
73 # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
76 def send_bot_post(domain: str, blocklist: dict):
77 # DEBUG: print(f"DEBUG: domain={domain},blocklist()={len(blocklist)} - CALLED!")
78 if not isinstance(domain, str):
79 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
81 raise ValueError("Parameter 'domain' is empty")
82 elif not isinstance(blocklist, dict):
83 raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'")
85 message = f"{domain} has blocked the following instances:\n\n"
88 if len(blocklist) > 20:
90 blocklist = blocklist[0 : 19]
92 # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}")
93 for block in blocklist:
94 # DEBUG: print(f"DEBUG: block['{type(block)}']={block}")
95 if block["reason"] is None or block["reason"] == '':
96 message = message + block["blocked"] + " with unspecified reason\n"
98 if len(block["reason"]) > 420:
99 block["reason"] = block["reason"][0:419] + "[…]"
101 message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
104 message = message + "(the list has been truncated to the first 20 entries)"
106 botheaders = {**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}}
109 f"{config.get('bot_instance')}/api/v1/statuses",
112 "visibility" : config.get('bot_visibility'),
113 "content_type": "text/plain"
121 def fetch_friendica_blocks(domain: str) -> dict:
122 # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
123 if not isinstance(domain, str):
124 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
126 raise ValueError("Parameter 'domain' is empty")
128 # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
132 doc = bs4.BeautifulSoup(
133 fetch_response(domain, "/friendica", web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
136 except BaseException as exc:
137 print("WARNING: Failed to fetch /friendica from domain:", domain, exc)
138 instances.update_last_error(domain, exc)
141 blocklist = doc.find(id="about_blocklist")
143 # Prevents exceptions:
144 if blocklist is None:
145 # DEBUG: print("DEBUG: Instance has no block list:", domain)
148 table = blocklist.find("table")
150 # DEBUG: print(f"DEBUG: table[]='{type(table)}'")
151 if table.find("tbody"):
152 rows = table.find("tbody").find_all("tr")
154 rows = table.find_all("tr")
156 # DEBUG: print(f"DEBUG: Found rows()={len(rows)}")
158 # DEBUG: print(f"DEBUG: line='{line}'")
160 "domain": fba.tidyup_domain(line.find_all("td")[0].text),
161 "reason": fba.tidyup_reason(line.find_all("td")[1].text)
163 # DEBUG: print("DEBUG: Next!")
165 # DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist))
170 def fetch_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response:
171 # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
172 if not isinstance(domain, str):
173 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
175 raise ValueError("Parameter 'domain' is empty")
176 elif not isinstance(path, str):
177 raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
179 raise ValueError("Parameter 'path' is empty")
181 # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}',headers()='{len(headers)}' ...")
182 headers = csrf.determine(domain, headers)
185 # DEBUG: print(f"DEBUG: Sending GET request to '{domain}{path}' ...")
186 response = reqto.get(
187 f"https://{domain}{path}",
192 except requests.exceptions.ConnectionError as exc:
193 # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exc[{type(exc)}]='{str(exc)}'")
194 instances.update_last_error(domain, exc)
197 # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")