1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 from fba import config
22 from fba import instances
24 def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
25 # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',parameter='{parameter}',extra_headers()={len(extra_headers)} - CALLED!")
26 if type(domain) != str:
27 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
29 raise ValueError(f"Parameter 'domain' is empty")
30 elif type(path) != str:
31 raise ValueError(f"path[]={type(path)} is not 'str'")
33 raise ValueError("Parameter 'path' cannot be empty")
34 elif type(parameter) != str:
35 raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
37 # DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
40 response = reqto.post(
41 f"https://{domain}{path}",
43 headers={**api_headers, **extra_headers},
44 timeout=(config.get("connection_timeout"), config.get("read_timeout"))
47 data = json_from_response(response)
48 # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
49 if not response.ok or response.status_code >= 400:
50 print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
51 instances.update_last_error(domain, response)
53 except BaseException as e:
54 print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exception[{type(e)}]:'{str(e)}'")
56 # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
59 def send_bot_post(instance: str, blocklist: dict):
60 # DEBUG: print(f"DEBUG: instance={instance},blocklist()={len(blocklist)} - CALLED!")
61 if type(domain) != str:
62 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
64 raise ValueError("Parameter 'domain' is empty")
65 elif type(blocklist) != dict:
66 raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'")
68 message = instance + " has blocked the following instances:\n\n"
71 if len(blocklist) > 20:
73 blocklist = blocklist[0 : 19]
75 # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}")
76 for block in blocklist:
77 # DEBUG: print(f"DEBUG: block['{type(block)}']={block}")
78 if block["reason"] == None or block["reason"] == '':
79 message = message + block["blocked"] + " with unspecified reason\n"
81 if len(block["reason"]) > 420:
82 block["reason"] = block["reason"][0:419] + "[…]"
84 message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
87 message = message + "(the list has been truncated to the first 20 entries)"
89 botheaders = {**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}}
92 f"{config.get('bot_instance')}/api/v1/statuses",
95 "visibility" : config.get('bot_visibility'),
96 "content_type": "text/plain"
104 def fetch_friendica_blocks(domain: str) -> dict:
105 # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
106 if type(domain) != str:
107 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
109 raise ValueError(f"Parameter 'domain' is empty")
111 # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
115 doc = bs4.BeautifulSoup(
116 fetch_response(domain, "/friendica", headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
119 except BaseException as e:
120 print("WARNING: Failed to fetch /friendica from domain:", domain, e)
121 instances.update_last_error(domain, e)
124 blocklist = doc.find(id="about_blocklist")
126 # Prevents exceptions:
127 if blocklist is None:
128 # DEBUG: print("DEBUG: Instance has no block list:", domain)
131 table = blocklist.find("table")
133 # DEBUG: print(f"DEBUG: table[]='{type(table)}'")
134 if table.find("tbody"):
135 rows = table.find("tbody").find_all("tr")
137 rows = table.find_all("tr")
139 # DEBUG: print(f"DEBUG: Found rows()={len(rows)}")
141 # DEBUG: print(f"DEBUG: line='{line}'")
143 "domain": tidyup_domain(line.find_all("td")[0].text),
144 "reason": tidyup_reason(line.find_all("td")[1].text)
146 # DEBUG: print("DEBUG: Next!")
148 # DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist))
153 def fetch_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response:
154 # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
155 if type(domain) != str:
156 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
158 raise ValueError("Parameter 'domain' is empty")
159 elif type(path) != str:
160 raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
162 raise ValueError("Parameter 'path' is empty")
165 # DEBUG: print(f"DEBUG: Sending request to '{domain}{path}' ...")
166 response = reqto.get(
167 f"https://{domain}{path}",
171 except requests.exceptions.ConnectionError as e:
172 # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(e)}]='{str(e)}'")
173 instances.update_last_error(domain, e)
176 # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")