1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 from fba import config
23 from fba import instances
25 # HTTP headers for non-API requests
27 "User-Agent": config.get("useragent"),
30 # HTTP headers for API requests
32 "User-Agent" : config.get("useragent"),
33 "Content-Type": "application/json",
36 def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
37 # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',parameter='{parameter}',extra_headers()={len(extra_headers)} - CALLED!")
38 if not isinstance(domain, str):
39 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
41 raise ValueError("Parameter 'domain' is empty")
42 elif not isinstance(path, str):
43 raise ValueError(f"path[]={type(path)} is not 'str'")
45 raise ValueError("Parameter 'path' cannot be empty")
46 elif not isinstance(parameter, str):
47 raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
49 # DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
52 response = reqto.post(
53 f"https://{domain}{path}",
55 headers={**api_headers, **extra_headers},
56 timeout=(config.get("connection_timeout"), config.get("read_timeout"))
59 data = fba.json_from_response(response)
60 # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
61 if not response.ok or response.status_code >= 400:
62 print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
63 instances.update_last_error(domain, response)
65 except BaseException as exception:
66 print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exception[{type(exception)}]:'{str(exception)}'")
68 # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
71 def send_bot_post(instance: str, blocklist: dict):
72 # DEBUG: print(f"DEBUG: instance={instance},blocklist()={len(blocklist)} - CALLED!")
73 if not isinstance(domain, str):
74 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
76 raise ValueError("Parameter 'domain' is empty")
77 elif not isinstance(blocklist, dict):
78 raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'")
80 message = f"{instance} has blocked the following instances:\n\n"
83 if len(blocklist) > 20:
85 blocklist = blocklist[0 : 19]
87 # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}")
88 for block in blocklist:
89 # DEBUG: print(f"DEBUG: block['{type(block)}']={block}")
90 if block["reason"] is None or block["reason"] == '':
91 message = message + block["blocked"] + " with unspecified reason\n"
93 if len(block["reason"]) > 420:
94 block["reason"] = block["reason"][0:419] + "[…]"
96 message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
99 message = message + "(the list has been truncated to the first 20 entries)"
101 botheaders = {**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}}
104 f"{config.get('bot_instance')}/api/v1/statuses",
107 "visibility" : config.get('bot_visibility'),
108 "content_type": "text/plain"
116 def fetch_friendica_blocks(domain: str) -> dict:
117 # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
118 if not isinstance(domain, str):
119 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
121 raise ValueError("Parameter 'domain' is empty")
123 # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
127 doc = bs4.BeautifulSoup(
128 fetch_response(domain, "/friendica", web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
131 except BaseException as exception:
132 print("WARNING: Failed to fetch /friendica from domain:", domain, exception)
133 instances.update_last_error(domain, exception)
136 blocklist = doc.find(id="about_blocklist")
138 # Prevents exceptions:
139 if blocklist is None:
140 # DEBUG: print("DEBUG: Instance has no block list:", domain)
143 table = blocklist.find("table")
145 # DEBUG: print(f"DEBUG: table[]='{type(table)}'")
146 if table.find("tbody"):
147 rows = table.find("tbody").find_all("tr")
149 rows = table.find_all("tr")
151 # DEBUG: print(f"DEBUG: Found rows()={len(rows)}")
153 # DEBUG: print(f"DEBUG: line='{line}'")
155 "domain": tidyup_domain(line.find_all("td")[0].text),
156 "reason": tidyup_reason(line.find_all("td")[1].text)
158 # DEBUG: print("DEBUG: Next!")
160 # DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist))
165 def fetch_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response:
166 # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
167 if not isinstance(domain, str):
168 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
170 raise ValueError("Parameter 'domain' is empty")
171 elif not isinstance(path, str):
172 raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
174 raise ValueError("Parameter 'path' is empty")
177 # DEBUG: print(f"DEBUG: Sending request to '{domain}{path}' ...")
178 response = reqto.get(
179 f"https://{domain}{path}",
183 except requests.exceptions.ConnectionError as exception:
184 # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
185 instances.update_last_error(domain, exception)
188 # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")