X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;ds=sidebyside;f=fba%2Ffederation.py;h=5b4c4ba8963f0c3d9fbca5638998577581f03edf;hb=96f9cee8f4bdd14be4e0f58636a9d8980c880293;hp=92669c50cea2356f67e0ec8d0a1ba47889946154;hpb=6bc99dfe2757e1e7275befeee91d7ac66ac7869a;p=fba.git diff --git a/fba/federation.py b/fba/federation.py index 92669c5..5b4c4ba 100644 --- a/fba/federation.py +++ b/fba/federation.py @@ -44,19 +44,19 @@ nodeinfo_identifier = [ def fetch_instances(domain: str, origin: str, software: str, script: str, path: str = None): # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',software='{software}',path='{path}' - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(origin, str) and origin is not None: - raise ValueError(f"Parameter origin[]={type(origin)} is not 'str'") + raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'") elif software is None: # DEBUG: print(f"DEBUG: software for domain='{domain}' is not set, determining ...") software = determine_software(domain, path) # DEBUG: print(f"DEBUG: Determined software='{software}' for domain='{domain}'") elif not isinstance(software, str): - raise ValueError(f"Parameter software[]={type(software)} is not 'str'") + raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'") elif not isinstance(script, str): - raise ValueError(f"Parameter script[]={type(script)} is not 'str'") + raise ValueError(f"Parameter script[]='{type(script)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") @@ -104,11 +104,11 @@ def fetch_instances(domain: str, origin: str, software: str, script: str, path: def fetch_peers(domain: str, software: str) -> list: # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},software={software} - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(software, str) and software is not None: - raise ValueError(f"software[]={type(software)} is not 'str'") + raise ValueError(f"software[]='{type(software)}' is not 'str'") if software == "misskey": # DEBUG: print(f"DEBUG: Invoking misskey.fetch_peers({domain}) ...") @@ -120,8 +120,16 @@ def fetch_peers(domain: str, software: str) -> list: # DEBUG: print(f"DEBUG: Invoking peertube.fetch_peers({domain}) ...") return peertube.fetch_peers(domain) + # Init peers variable + peers = list() + headers = tuple() + # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'") - headers = csrf.determine(domain, dict()) + try: + headers = csrf.determine(domain, dict()) + except network.exceptions as exception: + print(f"WARNING: Exception '{type(exception)}' during checking CSRF - EXIT!") + return # DEBUG: print(f"DEBUG: Fetching peers from '{domain}',software='{software}' ...") data = network.get_json_api( @@ -130,13 +138,14 @@ def fetch_peers(domain: str, software: str) -> list: headers, (config.get("connection_timeout"), config.get("read_timeout")) ) - # DEBUG: print(f"DEBUG: data[]='{type(data)}'") + # DEBUG: print(f"DEBUG: data[]='{type(data)}'") if "error_message" in data: # DEBUG: print("DEBUG: Was not able to fetch peers, trying alternative ...") data = network.get_json_api( domain, "/api/v3/site", + headers, (config.get("connection_timeout"), config.get("read_timeout")) ) @@ -148,8 +157,9 @@ def fetch_peers(domain: str, software: str) -> list: peers = peers + add_peers(data["json"]["federated_instances"]) # DEBUG: print("DEBUG: Added instance(s) to peers") else: - print("WARNING: JSON response does not contain 'federated_instances':", domain) - instances.update_last_error(domain, data) + message = "JSON response does not contain 'federated_instances' or 'error_message'" + print(f"WARNING: {message},domain='{domain}'") + instances.update_last_error(domain, message) else: # DEBUG: print("DEBUG: Querying API was successful:", domain, len(data)) peers = data["json"] @@ -166,11 +176,11 @@ def fetch_peers(domain: str, software: str) -> list: def fetch_nodeinfo(domain: str, path: str = None) -> list: # DEBUG: print(f"DEBUG: domain='{domain}',path={path} - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(path, str) and path is not None: - raise ValueError(f"Parameter path[]={type(path)} is not 'str'") + raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") # DEBUG: print(f"DEBUG: Fetching nodeinfo from domain='{domain}' ...") nodeinfo = fetch_wellknown_nodeinfo(domain) @@ -180,8 +190,14 @@ def fetch_nodeinfo(domain: str, path: str = None) -> list: # DEBUG: print("DEBUG: nodeinfo()={len(nodeinfo))} - EXIT!") return nodeinfo + headers = tuple() + # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'") - headers = csrf.determine(domain, dict()) + try: + headers = csrf.determine(domain, dict()) + except network.exceptions as exception: + print(f"WARNING: Exception '{type(exception)}' during checking CSRF - EXIT!") + return request_paths = [ "/nodeinfo/2.1.json", @@ -220,12 +236,18 @@ def fetch_nodeinfo(domain: str, path: str = None) -> list: def fetch_wellknown_nodeinfo(domain: str) -> list: # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") + headers = tuple() + # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'") - headers = csrf.determine(domain, dict()) + try: + headers = csrf.determine(domain, dict()) + except network.exceptions as exception: + print(f"WARNING: Exception '{type(exception)}' during checking CSRF - EXIT!") + return # DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain) data = network.get_json_api( @@ -268,11 +290,11 @@ def fetch_wellknown_nodeinfo(domain: str) -> list: def fetch_generator_from_path(domain: str, path: str = "/") -> str: # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(path, str): - raise ValueError(f"path[]={type(path)} is not 'str'") + raise ValueError(f"path[]='{type(path)}' is not 'str'") elif path == "": raise ValueError("Parameter 'path' is empty") @@ -303,7 +325,7 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: print(f"INFO: domain='{domain}' has og:site_name='{software}'") instances.set_data("detection_mode", domain, "SITE_NAME") - # DEBUG: print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if isinstance(software, str) and software == "": # DEBUG: print(f"DEBUG: Corrected empty string to None for software of domain='{domain}'") software = None @@ -311,7 +333,7 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: # DEBUG: print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...") software = fba.remove_version(software) - # DEBUG: print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if isinstance(software, str) and " powered by " in software: # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it") software = fba.remove_version(fba.strip_powered_by(software)) @@ -331,11 +353,11 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: def determine_software(domain: str, path: str = None) -> str: # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(path, str) and path is not None: - raise ValueError(f"Parameter path[]={type(path)} is not 'str'") + raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") # DEBUG: print("DEBUG: Determining software for domain,path:", domain, path) software = None @@ -392,12 +414,12 @@ def determine_software(domain: str, path: str = None) -> str: # DEBUG: print(f"DEBUG: software='{software}' has ' see ' in it") software = fba.strip_until(software, " see ") - # DEBUG: print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if software == "": print("WARNING: tidyup.domain() left no software name behind:", domain) software = None - # DEBUG: print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if str(software) == "": # DEBUG: print(f"DEBUG: software for '{domain}' was not detected, trying generator ...") software = fetch_generator_from_path(domain) @@ -405,7 +427,7 @@ def determine_software(domain: str, path: str = None) -> str: # DEBUG: print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...") software = fba.remove_version(software) - # DEBUG: print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if isinstance(software, str) and "powered by" in software: # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it") software = fba.remove_version(fba.strip_powered_by(software)) @@ -414,15 +436,15 @@ def determine_software(domain: str, path: str = None) -> str: return software def find_domains(tag: bs4.element.Tag) -> list: - # DEBUG: print(f"DEBUG: tag[]={type(tag)} - CALLED!") + # DEBUG: print(f"DEBUG: tag[]='{type(tag)}' - CALLED!") if not isinstance(tag, bs4.element.Tag): - raise ValueError(f"Parameter tag[]={type(tag)} is not type of bs4.element.Tag") + raise ValueError(f"Parameter tag[]='{type(tag)}' is not type of bs4.element.Tag") elif len(tag.select("tr")) == 0: raise KeyError("No table rows found in table!") domains = list() for element in tag.select("tr"): - # DEBUG: print(f"DEBUG: element[]={type(element)}") + # DEBUG: print(f"DEBUG: element[]='{type(element)}'") if not element.find("td"): # DEBUG: print("DEBUG: Skipping element, no found") continue