]> git.mxchange.org Git - fba.git/blob - fba/network.py
WIP:
[fba.git] / fba / network.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import bs4
18 import reqto
19 import requests
20
21 from fba import config
22 from fba import csrf
23 from fba import fba
24 from fba import instances
25
26 # HTTP headers for non-API requests
27 web_headers = {
28     "User-Agent": config.get("useragent"),
29 }
30
31 # HTTP headers for API requests
32 api_headers = {
33     "User-Agent"  : config.get("useragent"),
34     "Content-Type": "application/json",
35 }
36
37 def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
38     # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',parameter='{parameter}',extra_headers()={len(extra_headers)} - CALLED!")
39     if not isinstance(domain, str):
40         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
41     elif domain == "":
42         raise ValueError("Parameter 'domain' is empty")
43     elif not isinstance(path, str):
44         raise ValueError(f"path[]={type(path)} is not 'str'")
45     elif path == "":
46         raise ValueError("Parameter 'path' cannot be empty")
47     elif not isinstance(parameter, str):
48         raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
49
50     # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}' ...")
51     headers = csrf.determine(domain, {**api_headers, **extra_headers})
52
53     data = {}
54
55     try:
56         # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',parameter='{parameter}',extra_headers({len(extra_headers)})={extra_headers}")
57         response = reqto.post(
58             f"https://{domain}{path}",
59             data=parameter,
60             headers=headers,
61             timeout=(config.get("connection_timeout"), config.get("read_timeout"))
62         )
63
64         data = fba.json_from_response(response)
65         # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
66         if not response.ok or response.status_code >= 400:
67             print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
68             instances.update_last_error(domain, response)
69
70     except BaseException as exception:
71         print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exception[{type(exception)}]:'{str(exception)}'")
72
73     # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
74     return data
75
76 def send_bot_post(domain: str, blocklist: dict):
77     # DEBUG: print(f"DEBUG: domain={domain},blocklist()={len(blocklist)} - CALLED!")
78     if not isinstance(domain, str):
79         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
80     elif domain == "":
81         raise ValueError("Parameter 'domain' is empty")
82     elif not isinstance(blocklist, dict):
83         raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'")
84
85     message = f"{domain} has blocked the following instances:\n\n"
86     truncated = False
87
88     if len(blocklist) > 20:
89         truncated = True
90         blocklist = blocklist[0 : 19]
91
92     # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}")
93     for block in blocklist:
94         # DEBUG: print(f"DEBUG: block['{type(block)}']={block}")
95         if block["reason"] is None or block["reason"] == '':
96             message = message + block["blocked"] + " with unspecified reason\n"
97         else:
98             if len(block["reason"]) > 420:
99                 block["reason"] = block["reason"][0:419] + "[…]"
100
101             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
102
103     if truncated:
104         message = message + "(the list has been truncated to the first 20 entries)"
105
106     botheaders = {**api_headers, **{"Authorization": "Bearer " + config.get("bot_token")}}
107
108     req = reqto.post(
109         f"{config.get('bot_instance')}/api/v1/statuses",
110         data={
111             "status"      : message,
112             "visibility"  : config.get('bot_visibility'),
113             "content_type": "text/plain"
114         },
115         headers=botheaders,
116         timeout=10
117     ).json()
118
119     return True
120
121 def fetch_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response:
122     # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
123     if not isinstance(domain, str):
124         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
125     elif domain == "":
126         raise ValueError("Parameter 'domain' is empty")
127     elif not isinstance(path, str):
128         raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
129     elif path == "":
130         raise ValueError("Parameter 'path' is empty")
131
132     # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}',headers()='{len(headers)}' ...")
133     headers = csrf.determine(domain, headers)
134
135     try:
136         # DEBUG: print(f"DEBUG: Sending GET request to '{domain}{path}' ...")
137         response = reqto.get(
138             f"https://{domain}{path}",
139             headers=headers,
140             timeout=timeout
141         )
142
143     except requests.exceptions.ConnectionError as exception:
144         # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
145         instances.update_last_error(domain, exception)
146         raise exception
147
148     # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
149     return response