]> git.mxchange.org Git - fba.git/blob - fba/network.py
Fixed some issues found by pylint:
[fba.git] / fba / network.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import bs4
18 import reqto
19 import requests
20
21 from fba import config
22 from fba import instances
23
24 def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
25     # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',parameter='{parameter}',extra_headers()={len(extra_headers)} - CALLED!")
26     if not isinstance(domain, str):
27         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
28     elif domain == "":
29         raise ValueError("Parameter 'domain' is empty")
30     elif not isinstance(path, str):
31         raise ValueError(f"path[]={type(path)} is not 'str'")
32     elif path == "":
33         raise ValueError("Parameter 'path' cannot be empty")
34     elif not isinstance(parameter, str):
35         raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
36
37     # DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers)
38     data = {}
39     try:
40         response = reqto.post(
41             f"https://{domain}{path}",
42             data=parameter,
43             headers={**api_headers, **extra_headers},
44             timeout=(config.get("connection_timeout"), config.get("read_timeout"))
45         )
46
47         data = json_from_response(response)
48         # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
49         if not response.ok or response.status_code >= 400:
50             print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
51             instances.update_last_error(domain, response)
52
53     except BaseException as exception:
54         print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exception[{type(exception)}]:'{str(exception)}'")
55
56     # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
57     return data
58
59 def send_bot_post(instance: str, blocklist: dict):
60     # DEBUG: print(f"DEBUG: instance={instance},blocklist()={len(blocklist)} - CALLED!")
61     if not isinstance(domain, str):
62         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
63     elif domain == "":
64         raise ValueError("Parameter 'domain' is empty")
65     elif not isinstance(blocklist, dict):
66         raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'")
67
68     message = f"{instance} has blocked the following instances:\n\n"
69     truncated = False
70
71     if len(blocklist) > 20:
72         truncated = True
73         blocklist = blocklist[0 : 19]
74
75     # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}")
76     for block in blocklist:
77         # DEBUG: print(f"DEBUG: block['{type(block)}']={block}")
78         if block["reason"] is None or block["reason"] == '':
79             message = message + block["blocked"] + " with unspecified reason\n"
80         else:
81             if len(block["reason"]) > 420:
82                 block["reason"] = block["reason"][0:419] + "[…]"
83
84             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
85
86     if truncated:
87         message = message + "(the list has been truncated to the first 20 entries)"
88
89     botheaders = {**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}}
90
91     req = reqto.post(
92         f"{config.get('bot_instance')}/api/v1/statuses",
93         data={
94             "status"      : message,
95             "visibility"  : config.get('bot_visibility'),
96             "content_type": "text/plain"
97         },
98         headers=botheaders,
99         timeout=10
100     ).json()
101
102     return True
103
104 def fetch_friendica_blocks(domain: str) -> dict:
105     # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
106     if not isinstance(domain, str):
107         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
108     elif domain == "":
109         raise ValueError("Parameter 'domain' is empty")
110
111     # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
112     blocked = list()
113
114     try:
115         doc = bs4.BeautifulSoup(
116             fetch_response(domain, "/friendica", headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
117             "html.parser",
118         )
119     except BaseException as exception:
120         print("WARNING: Failed to fetch /friendica from domain:", domain, exception)
121         instances.update_last_error(domain, exception)
122         return {}
123
124     blocklist = doc.find(id="about_blocklist")
125
126     # Prevents exceptions:
127     if blocklist is None:
128         # DEBUG: print("DEBUG: Instance has no block list:", domain)
129         return {}
130
131     table = blocklist.find("table")
132
133     # DEBUG: print(f"DEBUG: table[]='{type(table)}'")
134     if table.find("tbody"):
135         rows = table.find("tbody").find_all("tr")
136     else:
137         rows = table.find_all("tr")
138
139     # DEBUG: print(f"DEBUG: Found rows()={len(rows)}")
140     for line in rows:
141         # DEBUG: print(f"DEBUG: line='{line}'")
142         blocked.append({
143             "domain": tidyup_domain(line.find_all("td")[0].text),
144             "reason": tidyup_reason(line.find_all("td")[1].text)
145         })
146         # DEBUG: print("DEBUG: Next!")
147
148     # DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist))
149     return {
150         "reject": blocked
151     }
152
153 def fetch_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response:
154     # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
155     if not isinstance(domain, str):
156         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
157     elif domain == "":
158         raise ValueError("Parameter 'domain' is empty")
159     elif not isinstance(path, str):
160         raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
161     elif path == "":
162         raise ValueError("Parameter 'path' is empty")
163
164     try:
165         # DEBUG: print(f"DEBUG: Sending request to '{domain}{path}' ...")
166         response = reqto.get(
167             f"https://{domain}{path}",
168             headers=headers,
169             timeout=timeout
170         );
171     except requests.exceptions.ConnectionError as exception:
172         # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
173         instances.update_last_error(domain, exception)
174         raise exception
175
176     # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
177     return response