]> git.mxchange.org Git - fba.git/blob - fba/network.py
Continued:
[fba.git] / fba / network.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import bs4
18 import reqto
19 import requests
20
21 from fba import config
22 from fba import csrf
23 from fba import fba
24 from fba import instances
25
26 # HTTP headers for non-API requests
27 web_headers = {
28     "User-Agent": config.get("useragent"),
29 }
30
31 # HTTP headers for API requests
32 api_headers = {
33     "User-Agent"  : config.get("useragent"),
34     "Content-Type": "application/json",
35 }
36
37 def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
38     # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',parameter='{parameter}',extra_headers()={len(extra_headers)} - CALLED!")
39     if not isinstance(domain, str):
40         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
41     elif domain == "":
42         raise ValueError("Parameter 'domain' is empty")
43     elif not isinstance(path, str):
44         raise ValueError(f"path[]={type(path)} is not 'str'")
45     elif path == "":
46         raise ValueError("Parameter 'path' cannot be empty")
47     elif not isinstance(parameter, str):
48         raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
49
50     # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}' ...")
51     headers = csrf.determine(domain, {**api_headers, **extra_headers})
52
53     data = {}
54
55     try:
56         # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',parameter='{parameter}',extra_headers({len(extra_headers)})={extra_headers}")
57         response = reqto.post(
58             f"https://{domain}{path}",
59             data=parameter,
60             headers=headers,
61             timeout=(config.get("connection_timeout"), config.get("read_timeout"))
62         )
63
64         data = fba.json_from_response(response)
65         # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
66         if not response.ok or response.status_code >= 400:
67             print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
68             instances.update_last_error(domain, response)
69
70     except BaseException as exc:
71         print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exc[{type(exc)}]:'{str(exc)}'")
72
73     # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
74     return data
75
76 def send_bot_post(domain: str, blocklist: dict):
77     # DEBUG: print(f"DEBUG: domain={domain},blocklist()={len(blocklist)} - CALLED!")
78     if not isinstance(domain, str):
79         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
80     elif domain == "":
81         raise ValueError("Parameter 'domain' is empty")
82     elif not isinstance(blocklist, dict):
83         raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'")
84
85     message = f"{domain} has blocked the following instances:\n\n"
86     truncated = False
87
88     if len(blocklist) > 20:
89         truncated = True
90         blocklist = blocklist[0 : 19]
91
92     # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}")
93     for block in blocklist:
94         # DEBUG: print(f"DEBUG: block['{type(block)}']={block}")
95         if block["reason"] is None or block["reason"] == '':
96             message = message + block["blocked"] + " with unspecified reason\n"
97         else:
98             if len(block["reason"]) > 420:
99                 block["reason"] = block["reason"][0:419] + "[…]"
100
101             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
102
103     if truncated:
104         message = message + "(the list has been truncated to the first 20 entries)"
105
106     botheaders = {**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}}
107
108     req = reqto.post(
109         f"{config.get('bot_instance')}/api/v1/statuses",
110         data={
111             "status"      : message,
112             "visibility"  : config.get('bot_visibility'),
113             "content_type": "text/plain"
114         },
115         headers=botheaders,
116         timeout=10
117     ).json()
118
119     return True
120
121 def fetch_friendica_blocks(domain: str) -> dict:
122     # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
123     if not isinstance(domain, str):
124         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
125     elif domain == "":
126         raise ValueError("Parameter 'domain' is empty")
127
128     # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
129     blocked = list()
130
131     try:
132         doc = bs4.BeautifulSoup(
133             fetch_response(domain, "/friendica", web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
134             "html.parser",
135         )
136     except BaseException as exc:
137         print("WARNING: Failed to fetch /friendica from domain:", domain, exc)
138         instances.update_last_error(domain, exc)
139         return {}
140
141     blocklist = doc.find(id="about_blocklist")
142
143     # Prevents exceptions:
144     if blocklist is None:
145         # DEBUG: print("DEBUG: Instance has no block list:", domain)
146         return {}
147
148     table = blocklist.find("table")
149
150     # DEBUG: print(f"DEBUG: table[]='{type(table)}'")
151     if table.find("tbody"):
152         rows = table.find("tbody").find_all("tr")
153     else:
154         rows = table.find_all("tr")
155
156     # DEBUG: print(f"DEBUG: Found rows()={len(rows)}")
157     for line in rows:
158         # DEBUG: print(f"DEBUG: line='{line}'")
159         blocked.append({
160             "domain": fba.tidyup_domain(line.find_all("td")[0].text),
161             "reason": fba.tidyup_reason(line.find_all("td")[1].text)
162         })
163         # DEBUG: print("DEBUG: Next!")
164
165     # DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist))
166     return {
167         "reject": blocked
168     }
169
170 def fetch_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response:
171     # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
172     if not isinstance(domain, str):
173         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
174     elif domain == "":
175         raise ValueError("Parameter 'domain' is empty")
176     elif not isinstance(path, str):
177         raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
178     elif path == "":
179         raise ValueError("Parameter 'path' is empty")
180
181     # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}',headers()='{len(headers)}' ...")
182     headers = csrf.determine(domain, headers)
183
184     try:
185         # DEBUG: print(f"DEBUG: Sending GET request to '{domain}{path}' ...")
186         response = reqto.get(
187             f"https://{domain}{path}",
188             headers=headers,
189             timeout=timeout
190         )
191
192     except requests.exceptions.ConnectionError as exc:
193         # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exc[{type(exc)}]='{str(exc)}'")
194         instances.update_last_error(domain, exc)
195         raise exc
196
197     # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
198     return response