X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Ffederation.py;h=5b4c4ba8963f0c3d9fbca5638998577581f03edf;hb=96f9cee8f4bdd14be4e0f58636a9d8980c880293;hp=b7e86a21dd5593fa5b8dc38ea6bc82e490c9a11e;hpb=f248d55bfebb9cd7885486637e89ad0b1960899a;p=fba.git diff --git a/fba/federation.py b/fba/federation.py index b7e86a2..5b4c4ba 100644 --- a/fba/federation.py +++ b/fba/federation.py @@ -13,13 +13,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import sys - import bs4 import validators from fba import blacklist from fba import config +from fba import csrf from fba import fba from fba import instances from fba import network @@ -43,47 +42,47 @@ nodeinfo_identifier = [ ] def fetch_instances(domain: str, origin: str, software: str, script: str, path: str = None): - print(f"DEBUG: domain='{domain}',origin='{origin}',software='{software}',path='{path}' - CALLED!") + # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',software='{software}',path='{path}' - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(origin, str) and origin is not None: - raise ValueError(f"Parameter origin[]={type(origin)} is not 'str'") + raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'") elif software is None: - print(f"DEBUG: software for domain='{domain}' is not set, determining ...") + # DEBUG: print(f"DEBUG: software for domain='{domain}' is not set, determining ...") software = determine_software(domain, path) - print(f"DEBUG: Determined software='{software}' for domain='{domain}'") + # DEBUG: print(f"DEBUG: Determined software='{software}' for domain='{domain}'") elif not isinstance(software, str): - raise ValueError(f"Parameter software[]={type(software)} is not 'str'") + raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'") elif not isinstance(script, str): - raise ValueError(f"Parameter script[]={type(script)} is not 'str'") + raise ValueError(f"Parameter script[]='{type(script)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") if not instances.is_registered(domain): - print("DEBUG: Adding new domain:", domain, origin) + # DEBUG: print("DEBUG: Adding new domain:", domain, origin) instances.add(domain, origin, script, path) - print("DEBUG: Fetching instances for domain:", domain, software) + # DEBUG: print("DEBUG: Fetching instances for domain:", domain, software) peerlist = fetch_peers(domain, software) if peerlist is None: print("ERROR: Cannot fetch peers:", domain) return elif instances.has_pending_instance_data(domain): - print(f"DEBUG: domain='{domain}' has pending nodeinfo data, flushing ...") + # DEBUG: print(f"DEBUG: domain='{domain}' has pending nodeinfo data, flushing ...") instances.update_data(domain) print(f"INFO: Checking {len(peerlist)} instances from {domain} ...") for instance in peerlist: if instance is None: - # Skip "None" types as tidup() cannot parse them + # Skip "None" types as tidup.domain() cannot parse them continue - print(f"DEBUG: instance='{instance}' - BEFORE") + # DEBUG: print(f"DEBUG: instance='{instance}' - BEFORE") instance = tidyup.domain(instance) - print(f"DEBUG: instance='{instance}' - AFTER") + # DEBUG: print(f"DEBUG: instance='{instance}' - AFTER") if instance == "": print("WARNING: Empty instance after tidyup.domain(), domain:", domain) @@ -92,96 +91,114 @@ def fetch_instances(domain: str, origin: str, software: str, script: str, path: print(f"WARNING: Bad instance='{instance}' from domain='{domain}',origin='{origin}',software='{software}'") continue elif blacklist.is_blacklisted(instance): - print("DEBUG: instance is blacklisted:", instance) + # DEBUG: print("DEBUG: instance is blacklisted:", instance) continue - print("DEBUG: Handling instance:", instance) - try: - if not instances.is_registered(instance): - print("DEBUG: Adding new instance:", instance, domain) - instances.add(instance, domain, script) - except BaseException as exception: - print(f"ERROR: instance='{instance}',exception[{type(exception)}]:'{str(exception)}'") - continue + # DEBUG: print("DEBUG: Handling instance:", instance) + if not instances.is_registered(instance): + # DEBUG: print("DEBUG: Adding new instance:", instance, domain) + instances.add(instance, domain, script) - print("DEBUG: EXIT!") + # DEBUG: print("DEBUG: EXIT!") def fetch_peers(domain: str, software: str) -> list: - print(f"DEBUG: domain({len(domain)})={domain},software={software} - CALLED!") + # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},software={software} - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(software, str) and software is not None: - raise ValueError(f"software[]={type(software)} is not 'str'") + raise ValueError(f"software[]='{type(software)}' is not 'str'") if software == "misskey": - print(f"DEBUG: Invoking misskey.fetch_peers({domain}) ...") + # DEBUG: print(f"DEBUG: Invoking misskey.fetch_peers({domain}) ...") return misskey.fetch_peers(domain) elif software == "lemmy": - print(f"DEBUG: Invoking lemmy.fetch_peers({domain}) ...") + # DEBUG: print(f"DEBUG: Invoking lemmy.fetch_peers({domain}) ...") return lemmy.fetch_peers(domain) elif software == "peertube": - print(f"DEBUG: Invoking peertube.fetch_peers({domain}) ...") + # DEBUG: print(f"DEBUG: Invoking peertube.fetch_peers({domain}) ...") return peertube.fetch_peers(domain) - print(f"DEBUG: Fetching peers from '{domain}',software='{software}' ...") + # Init peers variable peers = list() - response = network.fetch_response(domain, "/api/v1/instance/peers", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout"))) - print(f"DEBUG: response[]='{type(response)}'") - - data = network.json_from_response(response) - print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'") - - if not response.ok or response.status_code >= 400: - print("DEBUG: Was not able to fetch peers, trying alternative ...") - response = network.fetch_response(domain, "/api/v3/site", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout"))) - - data = network.json_from_response(response) - print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'") - if not response.ok or response.status_code >= 400: - print("WARNING: Could not reach any JSON API:", domain) - instances.update_last_error(domain, response) - elif response.ok and isinstance(data, list): - print(f"DEBUG: domain='{domain}' returned a list: '{data}'") - sys.exit(255) - elif "federated_instances" in data: - print(f"DEBUG: Found federated_instances for domain='{domain}'") - peers = peers + add_peers(data["federated_instances"]) - print("DEBUG: Added instance(s) to peers") + headers = tuple() + + # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'") + try: + headers = csrf.determine(domain, dict()) + except network.exceptions as exception: + print(f"WARNING: Exception '{type(exception)}' during checking CSRF - EXIT!") + return + + # DEBUG: print(f"DEBUG: Fetching peers from '{domain}',software='{software}' ...") + data = network.get_json_api( + domain, + "/api/v1/instance/peers", + headers, + (config.get("connection_timeout"), config.get("read_timeout")) + ) + + # DEBUG: print(f"DEBUG: data[]='{type(data)}'") + if "error_message" in data: + # DEBUG: print("DEBUG: Was not able to fetch peers, trying alternative ...") + data = network.get_json_api( + domain, + "/api/v3/site", + headers, + (config.get("connection_timeout"), config.get("read_timeout")) + ) + + # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'") + if "error_message" in data: + print(f"WARNING: Could not reach any JSON API at domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'") + elif "federated_instances" in data["json"]: + # DEBUG: print(f"DEBUG: Found federated_instances for domain='{domain}'") + peers = peers + add_peers(data["json"]["federated_instances"]) + # DEBUG: print("DEBUG: Added instance(s) to peers") else: - print("WARNING: JSON response does not contain 'federated_instances':", domain) - instances.update_last_error(domain, response) + message = "JSON response does not contain 'federated_instances' or 'error_message'" + print(f"WARNING: {message},domain='{domain}'") + instances.update_last_error(domain, message) else: - print("DEBUG: Querying API was successful:", domain, len(data)) - peers = data + # DEBUG: print("DEBUG: Querying API was successful:", domain, len(data)) + peers = data["json"] - print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'") + # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'") instances.set_data("total_peers", domain, len(peers)) - print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...") + # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...") instances.update_last_instance_fetch(domain) - print("DEBUG: Returning peers[]:", type(peers)) + # DEBUG: print("DEBUG: Returning peers[]:", type(peers)) return peers def fetch_nodeinfo(domain: str, path: str = None) -> list: - print(f"DEBUG: domain='{domain}',path={path} - CALLED!") + # DEBUG: print(f"DEBUG: domain='{domain}',path={path} - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(path, str) and path is not None: - raise ValueError(f"Parameter path[]={type(path)} is not 'str'") + raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") - print(f"DEBUG: Fetching nodeinfo from domain='{domain}' ...") + # DEBUG: print(f"DEBUG: Fetching nodeinfo from domain='{domain}' ...") nodeinfo = fetch_wellknown_nodeinfo(domain) - print(f"DEBUG: nodeinfo({len(nodeinfo)})={nodeinfo}") + # DEBUG: print(f"DEBUG: nodeinfo({len(nodeinfo)})={nodeinfo}") if len(nodeinfo) > 0: - print("DEBUG: nodeinfo()={len(nodeinfo))} - EXIT!") + # DEBUG: print("DEBUG: nodeinfo()={len(nodeinfo))} - EXIT!") return nodeinfo + headers = tuple() + + # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'") + try: + headers = csrf.determine(domain, dict()) + except network.exceptions as exception: + print(f"WARNING: Exception '{type(exception)}' during checking CSRF - EXIT!") + return + request_paths = [ "/nodeinfo/2.1.json", "/nodeinfo/2.1", @@ -193,180 +210,193 @@ def fetch_nodeinfo(domain: str, path: str = None) -> list: for request in request_paths: if path is not None and path != "" and path != request: - print(f"DEBUG: path='{path}' does not match request='{request}' - SKIPPED!") + # DEBUG: print(f"DEBUG: path='{path}' does not match request='{request}' - SKIPPED!") continue - print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...") - response = network.fetch_response(domain, request, network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout"))) - - data = network.json_from_response(response) - print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'") - if response.ok and isinstance(data, dict): - print("DEBUG: Success:", request) + # DEBUG: print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...") + data = network.get_json_api( + domain, + request, + headers, + (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")) + ) + + # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'") + if "error_message" not in data: + # DEBUG: print("DEBUG: Success:", request) instances.set_data("detection_mode", domain, "STATIC_CHECK") instances.set_data("nodeinfo_url" , domain, request) break - elif response.ok and isinstance(data, list): - print(f"UNSUPPORTED: domain='{domain}' returned a list: '{data}'") - sys.exit(255) - elif not response.ok or response.status_code >= 400: - print("WARNING: Failed fetching nodeinfo from domain:", domain) - instances.update_last_error(domain, response) - continue - print(f"DEBUG: data()={len(data)} - EXIT!") + print(f"WARNING: Failed fetching nodeinfo from domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'") + + # DEBUG: print(f"DEBUG: data()={len(data)} - EXIT!") return data def fetch_wellknown_nodeinfo(domain: str) -> list: - print(f"DEBUG: domain='{domain}' - CALLED!") + # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") - print("DEBUG: Fetching .well-known info for domain:", domain) - response = network.fetch_response(domain, "/.well-known/nodeinfo", network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout"))) + headers = tuple() - data = network.json_from_response(response) - print("DEBUG: domain,response.ok,data[]:", domain, response.ok, type(data)) - if response.ok and isinstance(data, dict): - nodeinfo = data - print("DEBUG: Found entries:", len(nodeinfo), domain) + # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'") + try: + headers = csrf.determine(domain, dict()) + except network.exceptions as exception: + print(f"WARNING: Exception '{type(exception)}' during checking CSRF - EXIT!") + return + + # DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain) + data = network.get_json_api( + domain, + "/.well-known/nodeinfo", + headers, + (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")) + ) + + if "error_message" not in data: + nodeinfo = data["json"] + # DEBUG: print("DEBUG: Found entries:", len(nodeinfo), domain) if "links" in nodeinfo: - print("DEBUG: Found links in nodeinfo():", len(nodeinfo["links"])) + # DEBUG: print("DEBUG: Found links in nodeinfo():", len(nodeinfo["links"])) for link in nodeinfo["links"]: - print("DEBUG: rel,href:", link["rel"], link["href"]) + # DEBUG: print("DEBUG: rel,href:", link["rel"], link["href"]) if link["rel"] in nodeinfo_identifier: - print("DEBUG: Fetching nodeinfo from:", link["href"]) - response = fba.fetch_url(link["href"], network.api_headers, (config.get("connection_timeout"), config.get("read_timeout"))) - - data = network.json_from_response(response) - print("DEBUG: href,response.ok,response.status_code:", link["href"], response.ok, response.status_code) - if response.ok and isinstance(data, dict): - print("DEBUG: Found JSON nodeinfo():", len(data)) + # DEBUG: print("DEBUG: Fetching nodeinfo from:", link["href"]) + data = network.fetch_api_url( + link["href"], + (config.get("connection_timeout"), config.get("read_timeout")) + ) + + # DEBUG: print("DEBUG: href,data[]:", link["href"], type(data)) + if "json" in data: + # DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data)) instances.set_data("detection_mode", domain, "AUTO_DISCOVERY") instances.set_data("nodeinfo_url" , domain, link["href"]) break + else: + instances.update_last_error(domain, data) else: print("WARNING: Unknown 'rel' value:", domain, link["rel"]) else: print("WARNING: nodeinfo does not contain 'links':", domain) - print("DEBUG: Returning data[]:", type(data)) + # DEBUG: print("DEBUG: Returning data[]:", type(data)) return data def fetch_generator_from_path(domain: str, path: str = "/") -> str: - print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!") + # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(path, str): - raise ValueError(f"path[]={type(path)} is not 'str'") + raise ValueError(f"path[]='{type(path)}' is not 'str'") elif path == "": raise ValueError("Parameter 'path' is empty") - print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!") + # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!") software = None - print(f"DEBUG: Fetching path='{path}' from '{domain}' ...") + # DEBUG: print(f"DEBUG: Fetching path='{path}' from '{domain}' ...") response = network.fetch_response(domain, path, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))) - print("DEBUG: domain,response.ok,response.status_code,response.text[]:", domain, response.ok, response.status_code, type(response.text)) + # DEBUG: print("DEBUG: domain,response.ok,response.status_code,response.text[]:", domain, response.ok, response.status_code, type(response.text)) if response.ok and response.status_code < 300 and len(response.text) > 0: - print("DEBUG: Search for :", domain) + # DEBUG: print("DEBUG: Search for :", domain) doc = bs4.BeautifulSoup(response.text, "html.parser") - print("DEBUG: doc[]:", type(doc)) - generator = doc.find("meta", {"name": "generator"}) + # DEBUG: print("DEBUG: doc[]:", type(doc)) + generator = doc.find("meta", {"name" : "generator"}) site_name = doc.find("meta", {"property": "og:site_name"}) - print(f"DEBUG: generator='{generator}',site_name='{site_name}'") + # DEBUG: print(f"DEBUG: generator='{generator}',site_name='{site_name}'") if isinstance(generator, bs4.element.Tag): - print("DEBUG: Found generator meta tag:", domain) + # DEBUG: print("DEBUG: Found generator meta tag:", domain) software = tidyup.domain(generator.get("content")) print(f"INFO: domain='{domain}' is generated by '{software}'") instances.set_data("detection_mode", domain, "GENERATOR") - fba.remove_pending_error(domain) elif isinstance(site_name, bs4.element.Tag): - print("DEBUG: Found property=og:site_name:", domain) + # DEBUG: print("DEBUG: Found property=og:site_name:", domain) sofware = tidyup.domain(site_name.get("content")) print(f"INFO: domain='{domain}' has og:site_name='{software}'") instances.set_data("detection_mode", domain, "SITE_NAME") - fba.remove_pending_error(domain) - print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if isinstance(software, str) and software == "": - print(f"DEBUG: Corrected empty string to None for software of domain='{domain}'") + # DEBUG: print(f"DEBUG: Corrected empty string to None for software of domain='{domain}'") software = None elif isinstance(software, str) and ("." in software or " " in software): - print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...") + # DEBUG: print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...") software = fba.remove_version(software) - print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if isinstance(software, str) and " powered by " in software: - print(f"DEBUG: software='{software}' has 'powered by' in it") + # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it") software = fba.remove_version(fba.strip_powered_by(software)) elif isinstance(software, str) and " hosted on " in software: - print(f"DEBUG: software='{software}' has 'hosted on' in it") + # DEBUG: print(f"DEBUG: software='{software}' has 'hosted on' in it") software = fba.remove_version(fba.strip_hosted_on(software)) elif isinstance(software, str) and " by " in software: - print(f"DEBUG: software='{software}' has ' by ' in it") + # DEBUG: print(f"DEBUG: software='{software}' has ' by ' in it") software = fba.strip_until(software, " by ") elif isinstance(software, str) and " see " in software: - print(f"DEBUG: software='{software}' has ' see ' in it") + # DEBUG: print(f"DEBUG: software='{software}' has ' see ' in it") software = fba.strip_until(software, " see ") - print(f"DEBUG: software='{software}' - EXIT!") + # DEBUG: print(f"DEBUG: software='{software}' - EXIT!") return software def determine_software(domain: str, path: str = None) -> str: - print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!") + # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!") if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif not isinstance(path, str) and path is not None: - raise ValueError(f"Parameter path[]={type(path)} is not 'str'") + raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") - print("DEBUG: Determining software for domain,path:", domain, path) + # DEBUG: print("DEBUG: Determining software for domain,path:", domain, path) software = None - print(f"DEBUG: Fetching nodeinfo from '{domain}' ...") + # DEBUG: print(f"DEBUG: Fetching nodeinfo from '{domain}' ...") data = fetch_nodeinfo(domain, path) - print("DEBUG: data[]:", type(data)) - if not isinstance(data, dict) or len(data) == 0: - print("DEBUG: Could not determine software type:", domain) + # DEBUG: print("DEBUG: data[]:", type(data)) + if "error_message" in data: + # DEBUG: print("DEBUG: Could not determine software type:", domain) return fetch_generator_from_path(domain) - print("DEBUG: data():", len(data), data) - if "status" in data and data["status"] == "error" and "message" in data: - print("WARNING: JSON response is an error:", data["message"]) - instances.update_last_error(domain, data["message"]) + # DEBUG: print("DEBUG: data():", len(data), data) + if "status" in data["json"] and data["json"]["status"] == "error" and "message" in data["json"]: + print("WARNING: JSON response is an error:", data["json"]["message"]) + instances.update_last_error(domain, data["json"]["message"]) return fetch_generator_from_path(domain) - elif "message" in data: + elif "message" in data["json"]: print("WARNING: JSON response contains only a message:", data["message"]) - instances.update_last_error(domain, data["message"]) + instances.update_last_error(domain, data["json"]["message"]) return fetch_generator_from_path(domain) - elif "software" not in data or "name" not in data["software"]: - print(f"DEBUG: JSON response from domain='{domain}' does not include [software][name], fetching / ...") + elif "software" not in data["json"] or "name" not in data["json"]["software"]: + # DEBUG: print(f"DEBUG: JSON response from domain='{domain}' does not include [software][name], fetching / ...") software = fetch_generator_from_path(domain) - print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!") + # DEBUG: print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!") return software - software = tidyup.domain(data["software"]["name"]) + software = tidyup.domain(data["json"]["software"]["name"]) - print("DEBUG: sofware after tidyup.domain():", software) + # DEBUG: print("DEBUG: sofware after tidyup.domain():", software) if software in ["akkoma", "rebased"]: - print("DEBUG: Setting pleroma:", domain, software) + # DEBUG: print("DEBUG: Setting pleroma:", domain, software) software = "pleroma" elif software in ["hometown", "ecko"]: - print("DEBUG: Setting mastodon:", domain, software) + # DEBUG: print("DEBUG: Setting mastodon:", domain, software) software = "mastodon" elif software in ["calckey", "groundpolis", "foundkey", "cherrypick", "meisskey"]: - print("DEBUG: Setting misskey:", domain, software) + # DEBUG: print("DEBUG: Setting misskey:", domain, software) software = "misskey" elif software.find("/") > 0: print("WARNING: Spliting of slash:", software) @@ -375,60 +405,60 @@ def determine_software(domain: str, path: str = None) -> str: print("WARNING: Spliting of pipe:", software) software = tidyup.domain(software.split("|")[0]) elif "powered by" in software: - print(f"DEBUG: software='{software}' has 'powered by' in it") + # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it") software = fba.strip_powered_by(software) elif isinstance(software, str) and " by " in software: - print(f"DEBUG: software='{software}' has ' by ' in it") + # DEBUG: print(f"DEBUG: software='{software}' has ' by ' in it") software = fba.strip_until(software, " by ") elif isinstance(software, str) and " see " in software: - print(f"DEBUG: software='{software}' has ' see ' in it") + # DEBUG: print(f"DEBUG: software='{software}' has ' see ' in it") software = fba.strip_until(software, " see ") - print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if software == "": print("WARNING: tidyup.domain() left no software name behind:", domain) software = None - print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if str(software) == "": - print(f"DEBUG: software for '{domain}' was not detected, trying generator ...") + # DEBUG: print(f"DEBUG: software for '{domain}' was not detected, trying generator ...") software = fetch_generator_from_path(domain) elif len(str(software)) > 0 and ("." in software or " " in software): - print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...") + # DEBUG: print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...") software = fba.remove_version(software) - print(f"DEBUG: software[]={type(software)}") + # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if isinstance(software, str) and "powered by" in software: - print(f"DEBUG: software='{software}' has 'powered by' in it") + # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it") software = fba.remove_version(fba.strip_powered_by(software)) - print("DEBUG: Returning domain,software:", domain, software) + # DEBUG: print("DEBUG: Returning domain,software:", domain, software) return software def find_domains(tag: bs4.element.Tag) -> list: - print(f"DEBUG: tag[]={type(tag)} - CALLED!") + # DEBUG: print(f"DEBUG: tag[]='{type(tag)}' - CALLED!") if not isinstance(tag, bs4.element.Tag): - raise ValueError(f"Parameter tag[]={type(tag)} is not type of bs4.element.Tag") + raise ValueError(f"Parameter tag[]='{type(tag)}' is not type of bs4.element.Tag") elif len(tag.select("tr")) == 0: raise KeyError("No table rows found in table!") domains = list() for element in tag.select("tr"): - print(f"DEBUG: element[]={type(element)}") + # DEBUG: print(f"DEBUG: element[]='{type(element)}'") if not element.find("td"): - print("DEBUG: Skipping element, no found") + # DEBUG: print("DEBUG: Skipping element, no found") continue domain = tidyup.domain(element.find("td").text) reason = tidyup.reason(element.findAll("td")[1].text) - print(f"DEBUG: domain='{domain}',reason='{reason}'") + # DEBUG: print(f"DEBUG: domain='{domain}',reason='{reason}'") if blacklist.is_blacklisted(domain): print(f"WARNING: domain='{domain}' is blacklisted - skipped!") continue elif domain == "gab.com/.ai, develop.gab.com": - print("DEBUG: Multiple domains detected in one row") + # DEBUG: print("DEBUG: Multiple domains detected in one row") domains.append({ "domain": "gab.com", "reason": reason, @@ -446,13 +476,13 @@ def find_domains(tag: bs4.element.Tag) -> list: print(f"WARNING: domain='{domain}' is not a valid domain - skipped!") continue - print(f"DEBUG: Adding domain='{domain}' ...") + # DEBUG: print(f"DEBUG: Adding domain='{domain}' ...") domains.append({ "domain": domain, "reason": reason, }) - print(f"DEBUG: domains()={len(domains)} - EXIT!") + # DEBUG: print(f"DEBUG: domains()={len(domains)} - EXIT!") return domains def add_peers(rows: dict) -> list: