# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
-import bs4
+import json
import reqto
import requests
from fba import config
-from fba import csrf
from fba import fba
from fba import instances
"Content-Type": "application/json",
}
-def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
- # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',parameter='{parameter}',extra_headers()={len(extra_headers)} - CALLED!")
+# Exceptions to always catch
+exceptions = (
+ requests.exceptions.ConnectionError,
+ requests.exceptions.Timeout,
+ requests.exceptions.TooManyRedirects,
+ UnicodeEncodeError
+)
+
+def post_json_api(domain: str, path: str, data: str, headers: dict = {}) -> dict:
+ # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',data='{data}',headers()={len(headers)} - CALLED!")
if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
elif not isinstance(path, str):
- raise ValueError(f"path[]={type(path)} is not 'str'")
+ raise ValueError(f"path[]='{type(path)}' is not 'str'")
elif path == "":
raise ValueError("Parameter 'path' cannot be empty")
- elif not isinstance(parameter, str):
- raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
+ elif not isinstance(data, str):
+ raise ValueError(f"data[]='{type(data)}' is not 'str'")
+ elif not isinstance(headers, dict):
+ raise ValueError(f"headers[]='{type(headers)}' is not 'list'")
- # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}' ...")
- headers = csrf.determine(domain, {**api_headers, **extra_headers})
-
- data = {}
+ json_reply = {
+ "status_code": 200,
+ }
try:
- # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',parameter='{parameter}',extra_headers({len(extra_headers)})={extra_headers}")
+ # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',data='{data}',headers({len(headers)})={headers}")
response = reqto.post(
f"https://{domain}{path}",
- data=parameter,
- headers=headers,
+ data=data,
+ headers={**api_headers, **headers},
timeout=(config.get("connection_timeout"), config.get("read_timeout"))
)
- data = fba.json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+ json_reply["json"] = json_from_response(response)
+
+ # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
if not response.ok or response.status_code >= 400:
- print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
+ print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',data()={len(data)},response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
+ json_reply["status_code"] = response.status_code
+ json_reply["error_message"] = response.reason
+ del json_reply["json"]
instances.update_last_error(domain, response)
- except BaseException as exception:
- print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exception[{type(exception)}]:'{str(exception)}'")
+ except exceptions as exception:
+ # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
+ json_reply["status_code"] = 999
+ json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
+ json_reply["exception"] = exception
+ instances.update_last_error(domain, exception)
+ raise exception
- # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
- return data
+ # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
+ return json_reply
+
+def fetch_api_url(url: str, timeout: tuple) -> dict:
+ # DEBUG: print(f"DEBUG: url='{url}',timeout()={len(timeout)} - CALLED!")
+ if not isinstance(url, str):
+ raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
+ elif not isinstance(timeout, tuple):
+ raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'")
+
+ json_reply = {
+ "status_code": 200,
+ }
+
+ try:
+ # DEBUG: print(f"DEBUG: Fetching url='{url}' ...")
+ response = fba.fetch_url(url, api_headers, timeout)
+
+ json_reply["json"] = json_from_response(response)
+
+ # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
+ if not response.ok or response.status_code >= 400:
+ print(f"WARNING: Cannot query JSON API: url='{url}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
+ json_reply["status_code"] = response.status_code
+ json_reply["error_message"] = response.reason
+ del json_reply["json"]
+
+ except exceptions as exception:
+ # DEBUG: print(f"DEBUG: Fetching '{url}' failed. exception[{type(exception)}]='{str(exception)}'")
+ json_reply["status_code"] = 999
+ json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
+ json_reply["exception"] = exception
+ raise exception
+
+ # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
+ return json_reply
+
+def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict:
+ # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',timeout()={len(timeout)} - CALLED!")
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif not isinstance(path, str):
+ raise ValueError(f"path[]='{type(path)}' is not 'str'")
+ elif path == "":
+ raise ValueError("Parameter 'path' cannot be empty")
+ elif not isinstance(headers, dict):
+ raise ValueError(f"headers[]='{type(headers)}' is not 'list'")
+ elif not isinstance(timeout, tuple):
+ raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'")
+
+ json_reply = {
+ "status_code": 200,
+ }
+
+ try:
+ # DEBUG: print(f"DEBUG: Sending GET to domain='{domain}',path='{path}',timeout({len(timeout)})={timeout}")
+ response = reqto.get(
+ f"https://{domain}{path}",
+ headers={**api_headers, **headers},
+ timeout=timeout
+ )
+
+ except exceptions as exception:
+ # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
+ json_reply["status_code"] = 999
+ json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
+ json_reply["exception"] = exception
+ instances.update_last_error(domain, exception)
+ raise exception
+
+ json_reply["json"] = json_from_response(response)
+
+ # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
+ if not response.ok or response.status_code >= 400:
+ print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
+ json_reply["status_code"] = response.status_code
+ json_reply["error_message"] = response.reason
+ del json_reply["json"]
+ instances.update_last_error(domain, response)
+
+ # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
+ return json_reply
def send_bot_post(domain: str, blocklist: dict):
# DEBUG: print(f"DEBUG: domain={domain},blocklist()={len(blocklist)} - CALLED!")
return True
-def fetch_friendica_blocks(domain: str) -> dict:
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
-
- # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
- blocked = list()
-
- try:
- doc = bs4.BeautifulSoup(
- fetch_response(domain, "/friendica", web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text,
- "html.parser",
- )
- except BaseException as exception:
- print("WARNING: Failed to fetch /friendica from domain:", domain, exception)
- instances.update_last_error(domain, exception)
- return {}
-
- blocklist = doc.find(id="about_blocklist")
-
- # Prevents exceptions:
- if blocklist is None:
- # DEBUG: print("DEBUG: Instance has no block list:", domain)
- return {}
-
- table = blocklist.find("table")
-
- # DEBUG: print(f"DEBUG: table[]='{type(table)}'")
- if table.find("tbody"):
- rows = table.find("tbody").find_all("tr")
- else:
- rows = table.find_all("tr")
-
- # DEBUG: print(f"DEBUG: Found rows()={len(rows)}")
- for line in rows:
- # DEBUG: print(f"DEBUG: line='{line}'")
- blocked.append({
- "domain": fba.tidyup_domain(line.find_all("td")[0].text),
- "reason": fba.tidyup_reason(line.find_all("td")[1].text)
- })
- # DEBUG: print("DEBUG: Next!")
-
- # DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist))
- return {
- "reject": blocked
- }
-
-def fetch_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response:
+def fetch_response(domain: str, path: str, headers: dict, timeout: tuple) -> requests.models.Response:
# DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
elif path == "":
raise ValueError("Parameter 'path' is empty")
-
- # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}',headers()='{len(headers)}' ...")
- headers = csrf.determine(domain, headers)
+ elif not isinstance(headers, dict):
+ raise ValueError(f"headers[]='{type(headers)}' is not 'dict'")
+ elif not isinstance(timeout, tuple):
+ raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'")
try:
# DEBUG: print(f"DEBUG: Sending GET request to '{domain}{path}' ...")
timeout=timeout
)
- except requests.exceptions.ConnectionError as exception:
+ except exceptions as exception:
# DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
instances.update_last_error(domain, exception)
raise exception
# DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
return response
+
+def json_from_response(response: requests.models.Response) -> list:
+ # DEBUG: print(f"DEBUG: response[]='{type(response)}' - CALLED!")
+ if not isinstance(response, requests.models.Response):
+ raise ValueError(f"Parameter response[]='{type(response)}' is not type of 'Response'")
+
+ data = list()
+ if response.text.strip() != "":
+ # DEBUG: print(f"DEBUG: response.text()={len(response.text)} is not empty, invoking response.json() ...")
+ try:
+ data = response.json()
+ except json.decoder.JSONDecodeError:
+ pass
+
+ # DEBUG: print(f"DEBUG: data[]='{type(data)}' - EXIT!")
+ return data