X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Fnetwork.py;h=18e9437b1c7a9ccaaada26494ab792bd428f6c04;hb=c8b6b6a4aea21b3fefc8fd4900391d59751ff814;hp=a60e0e89782c4c5c32793bbff6a9d8644481a151;hpb=bf40c5220d0def178c31df888ddeb5113481c52c;p=fba.git diff --git a/fba/network.py b/fba/network.py index a60e0e8..18e9437 100644 --- a/fba/network.py +++ b/fba/network.py @@ -14,58 +14,174 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import bs4 +import json import reqto import requests from fba import config +from fba import fba from fba import instances -def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict: - # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',parameter='{parameter}',extra_headers()={len(extra_headers)} - CALLED!") - if type(domain) != str: - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") +# HTTP headers for non-API requests +web_headers = { + "User-Agent": config.get("useragent"), +} + +# HTTP headers for API requests +api_headers = { + "User-Agent" : config.get("useragent"), + "Content-Type": "application/json", +} + +# Exceptions to always catch +exceptions = ( + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + requests.exceptions.TooManyRedirects, + UnicodeEncodeError +) + +def post_json_api(domain: str, path: str, data: str, headers: dict = {}) -> dict: + # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',data='{data}',headers()={len(headers)} - CALLED!") + if not isinstance(domain, str): + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": - raise ValueError(f"Parameter 'domain' is empty") - elif type(path) != str: - raise ValueError(f"path[]={type(path)} is not 'str'") + raise ValueError("Parameter 'domain' is empty") + elif not isinstance(path, str): + raise ValueError(f"path[]='{type(path)}' is not 'str'") elif path == "": raise ValueError("Parameter 'path' cannot be empty") - elif type(parameter) != str: - raise ValueError(f"parameter[]={type(parameter)} is not 'str'") + elif not isinstance(data, str): + raise ValueError(f"data[]='{type(data)}' is not 'str'") + elif not isinstance(headers, dict): + raise ValueError(f"headers[]='{type(headers)}' is not 'list'") + + json_reply = { + "status_code": 200, + } - # DEBUG: print("DEBUG: Sending POST to domain,path,parameter:", domain, path, parameter, extra_headers) - data = {} try: + # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',data='{data}',headers({len(headers)})={headers}") response = reqto.post( f"https://{domain}{path}", - data=parameter, - headers={**api_headers, **extra_headers}, + data=data, + headers={**api_headers, **headers}, timeout=(config.get("connection_timeout"), config.get("read_timeout")) ) - data = json_from_response(response) - # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'") + json_reply["json"] = json_from_response(response) + + # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'") if not response.ok or response.status_code >= 400: - print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'") + print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',data()={len(data)},response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'") + json_reply["status_code"] = response.status_code + json_reply["error_message"] = response.reason + del json_reply["json"] instances.update_last_error(domain, response) - except BaseException as e: - print(f"WARNING: Some error during post(): domain='{domain}',path='{path}',parameter()={len(parameter)},exception[{type(e)}]:'{str(e)}'") + except exceptions as exception: + # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'") + json_reply["status_code"] = 999 + json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'" + json_reply["exception"] = exception + instances.update_last_error(domain, exception) + raise exception - # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}") - return data + # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}") + return json_reply + +def fetch_api_url(url: str, timeout: tuple) -> dict: + # DEBUG: print(f"DEBUG: url='{url}',timeout()={len(timeout)} - CALLED!") + if not isinstance(url, str): + raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'") + elif not isinstance(timeout, tuple): + raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'") + + json_reply = { + "status_code": 200, + } + + try: + # DEBUG: print(f"DEBUG: Fetching url='{url}' ...") + response = fba.fetch_url(url, api_headers, timeout) + + json_reply["json"] = json_from_response(response) + + # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'") + if not response.ok or response.status_code >= 400: + print(f"WARNING: Cannot query JSON API: url='{url}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'") + json_reply["status_code"] = response.status_code + json_reply["error_message"] = response.reason + del json_reply["json"] + + except exceptions as exception: + # DEBUG: print(f"DEBUG: Fetching '{url}' failed. exception[{type(exception)}]='{str(exception)}'") + json_reply["status_code"] = 999 + json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'" + json_reply["exception"] = exception + raise exception + + # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}") + return json_reply + +def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict: + # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',timeout()={len(timeout)} - CALLED!") + if not isinstance(domain, str): + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") + elif domain == "": + raise ValueError("Parameter 'domain' is empty") + elif not isinstance(path, str): + raise ValueError(f"path[]='{type(path)}' is not 'str'") + elif path == "": + raise ValueError("Parameter 'path' cannot be empty") + elif not isinstance(headers, dict): + raise ValueError(f"headers[]='{type(headers)}' is not 'list'") + elif not isinstance(timeout, tuple): + raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'") -def send_bot_post(instance: str, blocklist: dict): - # DEBUG: print(f"DEBUG: instance={instance},blocklist()={len(blocklist)} - CALLED!") - if type(domain) != str: + json_reply = { + "status_code": 200, + } + + try: + # DEBUG: print(f"DEBUG: Sending GET to domain='{domain}',path='{path}',timeout({len(timeout)})={timeout}") + response = reqto.get( + f"https://{domain}{path}", + headers={**api_headers, **headers}, + timeout=timeout + ) + + except exceptions as exception: + # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'") + json_reply["status_code"] = 999 + json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'" + json_reply["exception"] = exception + instances.update_last_error(domain, exception) + raise exception + + json_reply["json"] = json_from_response(response) + + # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'") + if not response.ok or response.status_code >= 400: + print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'") + json_reply["status_code"] = response.status_code + json_reply["error_message"] = response.reason + del json_reply["json"] + instances.update_last_error(domain, response) + + # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}") + return json_reply + +def send_bot_post(domain: str, blocklist: dict): + # DEBUG: print(f"DEBUG: domain={domain},blocklist()={len(blocklist)} - CALLED!") + if not isinstance(domain, str): raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") - elif type(blocklist) != dict: + elif not isinstance(blocklist, dict): raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'") - message = instance + " has blocked the following instances:\n\n" + message = f"{domain} has blocked the following instances:\n\n" truncated = False if len(blocklist) > 20: @@ -75,7 +191,7 @@ def send_bot_post(instance: str, blocklist: dict): # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}") for block in blocklist: # DEBUG: print(f"DEBUG: block['{type(block)}']={block}") - if block["reason"] == None or block["reason"] == '': + if block["reason"] is None or block["reason"] == '': message = message + block["blocked"] + " with unspecified reason\n" else: if len(block["reason"]) > 420: @@ -101,77 +217,49 @@ def send_bot_post(instance: str, blocklist: dict): return True -def fetch_friendica_blocks(domain: str) -> dict: - # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!") - if type(domain) != str: - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") - elif domain == "": - raise ValueError(f"Parameter 'domain' is empty") - - # DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain) - blocked = list() - - try: - doc = bs4.BeautifulSoup( - fetch_response(domain, "/friendica", headers, (config.get("connection_timeout"), config.get("read_timeout"))).text, - "html.parser", - ) - except BaseException as e: - print("WARNING: Failed to fetch /friendica from domain:", domain, e) - instances.update_last_error(domain, e) - return {} - - blocklist = doc.find(id="about_blocklist") - - # Prevents exceptions: - if blocklist is None: - # DEBUG: print("DEBUG: Instance has no block list:", domain) - return {} - - table = blocklist.find("table") - - # DEBUG: print(f"DEBUG: table[]='{type(table)}'") - if table.find("tbody"): - rows = table.find("tbody").find_all("tr") - else: - rows = table.find_all("tr") - - # DEBUG: print(f"DEBUG: Found rows()={len(rows)}") - for line in rows: - # DEBUG: print(f"DEBUG: line='{line}'") - blocked.append({ - "domain": tidyup_domain(line.find_all("td")[0].text), - "reason": tidyup_reason(line.find_all("td")[1].text) - }) - # DEBUG: print("DEBUG: Next!") - - # DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist)) - return { - "reject": blocked - } - -def fetch_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response: +def fetch_response(domain: str, path: str, headers: dict, timeout: tuple) -> requests.models.Response: # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!") - if type(domain) != str: + if not isinstance(domain, str): raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") - elif type(path) != str: + elif not isinstance(path, str): raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") elif path == "": raise ValueError("Parameter 'path' is empty") + elif not isinstance(headers, dict): + raise ValueError(f"headers[]='{type(headers)}' is not 'dict'") + elif not isinstance(timeout, tuple): + raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'") try: - # DEBUG: print(f"DEBUG: Sending request to '{domain}{path}' ...") + # DEBUG: print(f"DEBUG: Sending GET request to '{domain}{path}' ...") response = reqto.get( f"https://{domain}{path}", headers=headers, timeout=timeout - ); - except requests.exceptions.ConnectionError as e: - # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(e)}]='{str(e)}'") - instances.update_last_error(domain, e) - raise e + ) + + except exceptions as exception: + # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'") + instances.update_last_error(domain, exception) + raise exception # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!") return response + +def json_from_response(response: requests.models.Response) -> list: + # DEBUG: print(f"DEBUG: response[]='{type(response)}' - CALLED!") + if not isinstance(response, requests.models.Response): + raise ValueError(f"Parameter response[]='{type(response)}' is not type of 'Response'") + + data = list() + if response.text.strip() != "": + # DEBUG: print(f"DEBUG: response.text()={len(response.text)} is not empty, invoking response.json() ...") + try: + data = response.json() + except json.decoder.JSONDecodeError: + pass + + # DEBUG: print(f"DEBUG: data[]='{type(data)}' - EXIT!") + return data