X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Ffederation.py;h=ec716172a13e67b5e8e2dab37eaae1d868794664;hb=ca9dd0d785407bc9b9573b80df9ae0496f4db4a6;hp=abee665dd31c2f752cd3935bffc3a7a5dec92892;hpb=481de3ec49efdfdbff0b442bb0d7bd8c26328e39;p=fba.git diff --git a/fba/federation.py b/fba/federation.py index abee665..ec71617 100644 --- a/fba/federation.py +++ b/fba/federation.py @@ -13,17 +13,20 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from urllib.parse import urlparse + import bs4 import validators from fba import blacklist from fba import config from fba import csrf -from fba import fba -from fba import instances from fba import network from fba.helpers import tidyup +from fba.helpers import version + +from fba.models import instances from fba.networks import lemmy from fba.networks import misskey @@ -41,28 +44,52 @@ nodeinfo_identifier = [ "http://nodeinfo.diaspora.software/ns/schema/1.0", ] -def fetch_instances(domain: str, origin: str, software: str, script: str, path: str = None): +def fetch_instances(domain: str, origin: str, software: str, command: str, path: str = None): # DEBUG: print(f"DEBUG: domain='{domain}',origin='{origin}',software='{software}',path='{path}' - CALLED!") if not isinstance(domain, str): raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") + elif not validators.domain(domain.split("/")[0]): + raise ValueError(f"domain='{domain}' is not a valid domain") + elif domain.endswith(".arpa"): + raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") + elif domain.endswith(".tld"): + raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") elif not isinstance(origin, str) and origin is not None: raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'") elif software is None: + # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...") + instances.set_last_instance_fetch(domain) + # DEBUG: print(f"DEBUG: software for domain='{domain}' is not set, determining ...") - software = determine_software(domain, path) + software = None + try: + software = determine_software(domain, path) + except network.exceptions as exception: + # DEBUG: print(f"DEBUG: Exception '{type(exception)}' during determining software type") + pass + # DEBUG: print(f"DEBUG: Determined software='{software}' for domain='{domain}'") elif not isinstance(software, str): raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'") - elif not isinstance(script, str): - raise ValueError(f"Parameter script[]='{type(script)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") + elif not isinstance(command, str): + raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'") + elif command == "": + raise ValueError("Parameter 'command' is empty") + elif not validators.domain(domain.split("/")[0]): + raise ValueError(f"domain='{domain}' is not a valid domain") + elif domain.endswith(".arpa"): + raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") + elif domain.endswith(".tld"): + raise ValueError(f"domain='{domain}' is a fake domain") if not instances.is_registered(domain): - # DEBUG: print("DEBUG: Adding new domain:", domain, origin) - instances.add(domain, origin, script, path) + # DEBUG: print(f"DEBUG: Adding new domain='{domain}',origin='{origin}',command='{command}',path='{path}',software='{software}'") + instances.add(domain, origin, command, path, software) + + # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...") + instances.set_last_instance_fetch(domain) # DEBUG: print("DEBUG: Fetching instances for domain:", domain, software) peerlist = fetch_peers(domain, software) @@ -70,12 +97,13 @@ def fetch_instances(domain: str, origin: str, software: str, script: str, path: if peerlist is None: print("ERROR: Cannot fetch peers:", domain) return - elif instances.has_pending_instance_data(domain): + elif instances.has_pending(domain): # DEBUG: print(f"DEBUG: domain='{domain}' has pending nodeinfo data, flushing ...") instances.update_data(domain) - print(f"INFO: Checking {len(peerlist)} instances from {domain} ...") + print(f"INFO: Checking {len(peerlist)} instances from domain='{domain}' ...") for instance in peerlist: + # DEBUG: print(f"DEBUG: instance='{instance}'") if instance is None: # Skip "None" types as tidup.domain() cannot parse them continue @@ -85,28 +113,41 @@ def fetch_instances(domain: str, origin: str, software: str, script: str, path: # DEBUG: print(f"DEBUG: instance='{instance}' - AFTER") if instance == "": - print("WARNING: Empty instance after tidyup.domain(), domain:", domain) + print(f"WARNING: Empty instance after tidyup.domain(), domain='{domain}'") continue elif not validators.domain(instance.split("/")[0]): - print(f"WARNING: Bad instance='{instance}' from domain='{domain}',origin='{origin}',software='{software}'") + print(f"WARNING: Bad instance='{instance}' from domain='{domain}',origin='{origin}'") + continue + elif instance.endswith(".arpa"): + print(f"WARNING: instance='{instance}' is a reversed .arpa domain and should not be used generally.") continue elif blacklist.is_blacklisted(instance): # DEBUG: print("DEBUG: instance is blacklisted:", instance) continue - - # DEBUG: print("DEBUG: Handling instance:", instance) - if not instances.is_registered(instance): + elif instance.find("/profile/") > 0 or instance.find("/users/") > 0: + # DEBUG: print(f"DEBUG: instance='{instance}' is a link to a single user profile - SKIPPED!") + continue + elif instance.endswith(".tld"): + # DEBUG: print(f"DEBUG: instance='{instance}' is a fake domain - SKIPPED!") + continue + elif not instances.is_registered(instance): # DEBUG: print("DEBUG: Adding new instance:", instance, domain) - instances.add(instance, domain, script) + instances.add(instance, domain, command) # DEBUG: print("DEBUG: EXIT!") def fetch_peers(domain: str, software: str) -> list: - # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},software={software} - CALLED!") + # DEBUG: print(f"DEBUG: domain({len(domain)})='{domain}',software='{software}' - CALLED!") if not isinstance(domain, str): raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") + elif not validators.domain(domain.split("/")[0]): + raise ValueError(f"domain='{domain}' is not a valid domain") + elif domain.endswith(".arpa"): + raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") + elif domain.endswith(".tld"): + raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") elif not isinstance(software, str) and software is not None: raise ValueError(f"software[]='{type(software)}' is not 'str'") @@ -122,6 +163,7 @@ def fetch_peers(domain: str, software: str) -> list: # Init peers variable peers = list() + # No CSRF by default, you don't have to add network.api_headers by yourself here headers = tuple() @@ -130,6 +172,7 @@ def fetch_peers(domain: str, software: str) -> list: headers = csrf.determine(domain, dict()) except network.exceptions as exception: print(f"WARNING: Exception '{type(exception)}' during checking CSRF (fetch_peers,{__name__}) - EXIT!") + instances.set_last_error(domain, exception) return peers # DEBUG: print(f"DEBUG: Fetching peers from '{domain}',software='{software}' ...") @@ -160,46 +203,57 @@ def fetch_peers(domain: str, software: str) -> list: else: message = "JSON response does not contain 'federated_instances' or 'error_message'" print(f"WARNING: {message},domain='{domain}'") - instances.update_last_error(domain, message) - else: - # DEBUG: print("DEBUG: Querying API was successful:", domain, len(data)) + instances.set_last_error(domain, message) + elif isinstance(data["json"], list): + # DEBUG print("DEBUG: Querying API was successful:", domain, len(data['json'])) peers = data["json"] + else: + print(f"WARNING: Cannot parse data[json][]='{type(data['json'])}'") # DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'") - instances.set_data("total_peers", domain, len(peers)) - - # DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...") - instances.update_last_instance_fetch(domain) + instances.set_total_peers(domain, peers) # DEBUG: print("DEBUG: Returning peers[]:", type(peers)) return peers def fetch_nodeinfo(domain: str, path: str = None) -> dict: - # DEBUG: print(f"DEBUG: domain='{domain}',path={path} - CALLED!") + # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!") if not isinstance(domain, str): raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") + elif not validators.domain(domain.split("/")[0]): + raise ValueError(f"domain='{domain}' is not a valid domain") + elif domain.endswith(".arpa"): + raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") + elif domain.endswith(".tld"): + raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") elif not isinstance(path, str) and path is not None: raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") # DEBUG: print(f"DEBUG: Fetching nodeinfo from domain='{domain}' ...") nodeinfo = fetch_wellknown_nodeinfo(domain) - # DEBUG: print(f"DEBUG: nodeinfo[{type(nodeinfo)}]='{nodeinfo}'") - if "error_message" in nodeinfo: - print(f"WARNING: Error during fetching nodeinfo: '{nodeinfo['error_message']}' - EXIT!") - return nodeinfo + # DEBUG: print(f"DEBUG: nodeinfo[{type(nodeinfo)}]({len(nodeinfo)}='{nodeinfo}'") + if "error_message" not in nodeinfo and "json" in nodeinfo and len(nodeinfo["json"]) > 0: + # DEBUG: print(f"DEBUG: Found nodeinfo[json]()={len(nodeinfo['json'])} - EXIT!") + return nodeinfo["json"] # No CSRF by default, you don't have to add network.api_headers by yourself here headers = tuple() + data = dict() try: # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'") headers = csrf.determine(domain, dict()) except network.exceptions as exception: print(f"WARNING: Exception '{type(exception)}' during checking CSRF (nodeinfo,{__name__}) - EXIT!") - return dict() + instances.set_last_error(domain, exception) + return { + "status_code" : 500, + "error_message": f"exception[{type(exception)}]='{str(exception)}'", + "exception" : exception, + } request_paths = [ "/nodeinfo/2.1.json", @@ -211,27 +265,29 @@ def fetch_nodeinfo(domain: str, path: str = None) -> dict: ] for request in request_paths: - # DEBUG: print(f"DEBUG: path[{type(path)}]='{path}',request='{request'}") - if path is not None and path != "" and path != request: - # DEBUG: print(f"DEBUG: path='{path}' does not match request='{request}' - SKIPPED!") - continue - - # DEBUG: print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...") - data = network.get_json_api( - domain, - request, - headers, - (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")) - ) - - # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'") - if "error_message" not in data: - # DEBUG: print("DEBUG: Success:", request) - instances.set_data("detection_mode", domain, "STATIC_CHECK") - instances.set_data("nodeinfo_url" , domain, request) - break - - print(f"WARNING: Failed fetching nodeinfo from domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'") + # DEBUG: print(f"DEBUG: path[{type(path)}]='{path}',request='{request}'") + if path is None or path == request or path == f"http://{domain}{path}" or path == f"https://{domain}{path}": + # DEBUG: print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...") + if path == f"http://{domain}{path}" or path == f"https://{domain}{path}": + # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' has protocol in path, splitting ...") + components = urlparse(path) + path = components.path + + data = network.get_json_api( + domain, + request, + headers, + (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")) + ) + + # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'") + if "error_message" not in data: + # DEBUG: print("DEBUG: Success:", request) + instances.set_detection_mode(domain, "STATIC_CHECK") + instances.set_nodeinfo_url(domain, request) + break + + print(f"WARNING: Failed fetching nodeinfo from domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'") # DEBUG: print(f"DEBUG: data()={len(data)} - EXIT!") return data @@ -242,6 +298,12 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict: raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") + elif not validators.domain(domain.split("/")[0]): + raise ValueError(f"domain='{domain}' is not a valid domain") + elif domain.endswith(".arpa"): + raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") + elif domain.endswith(".tld"): + raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") # No CSRF by default, you don't have to add network.api_headers by yourself here headers = tuple() @@ -250,10 +312,12 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict: # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'") headers = csrf.determine(domain, dict()) except network.exceptions as exception: - print(f"WARNING: Exception '{type(exception)}' during checking CSRF (fetch_wellknown,{__name__}) - EXIT!") + print(f"WARNING: Exception '{type(exception)}' during checking CSRF (fetch_wellknown_nodeinfo,{__name__}) - EXIT!") + instances.set_last_error(domain, exception) return { "status_code" : 500, - "error_message": type(exception) + "error_message": type(exception), + "exception" : exception, } # DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain) @@ -270,22 +334,41 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict: if "links" in nodeinfo: # DEBUG: print("DEBUG: Found links in nodeinfo():", len(nodeinfo["links"])) for link in nodeinfo["links"]: - # DEBUG: print("DEBUG: rel,href:", link["rel"], link["href"]) - if link["rel"] in nodeinfo_identifier: - # DEBUG: print("DEBUG: Fetching nodeinfo from:", link["href"]) + # DEBUG: print(f"DEBUG: link[{type(link)}]='{link}'") + if not isinstance(link, dict) or not "rel" in link: + print(f"WARNING: link[]='{type(link)}' is not 'dict' or no element 'rel' found") + elif link["rel"] in nodeinfo_identifier: + # Default is that 'href' has a complete URL, but some hosts don't send that + url = link["href"] + components = urlparse(link["href"]) + + # DEBUG: print(f"DEBUG: components[{type(components)}]='{components}'") + if components.scheme == "" and components.netloc == "": + # DEBUG: print(f"DEBUG: link[href]='{link['href']}' has no scheme and host name in it, prepending from domain='{domain}'") + url = f"https://{domain}{url}" + components = urlparse(url) + + if blacklist.is_blacklisted(components.netloc): + print(f"WARNING: components.netloc='{components.netloc}' is blacklisted - SKIPPED!") + continue + elif not validators.domain(components.netloc): + print(f"WARNING: components.netloc='{components.netloc}' is not a valid domain - SKIPPED!") + continue + + # DEBUG: print("DEBUG: Fetching nodeinfo from:", url) data = network.fetch_api_url( - link["href"], + url, (config.get("connection_timeout"), config.get("read_timeout")) ) # DEBUG: print("DEBUG: href,data[]:", link["href"], type(data)) - if "json" in data: + if "error_message" not in data and "json" in data: # DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data)) - instances.set_data("detection_mode", domain, "AUTO_DISCOVERY") - instances.set_data("nodeinfo_url" , domain, link["href"]) + instances.set_detection_mode(domain, "AUTO_DISCOVERY") + instances.set_nodeinfo_url(domain, link["href"]) break else: - instances.update_last_error(domain, data) + instances.set_last_error(domain, data) else: print("WARNING: Unknown 'rel' value:", domain, link["rel"]) else: @@ -295,11 +378,17 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict: return data def fetch_generator_from_path(domain: str, path: str = "/") -> str: - # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!") + # DEBUG: print(f"DEBUG: domain({len(domain)})='{domain}',path='{path}' - CALLED!") if not isinstance(domain, str): raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") + elif not validators.domain(domain.split("/")[0]): + raise ValueError(f"domain='{domain}' is not a valid domain") + elif domain.endswith(".arpa"): + raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") + elif domain.endswith(".tld"): + raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") elif not isinstance(path, str): raise ValueError(f"path[]='{type(path)}' is not 'str'") elif path == "": @@ -321,16 +410,20 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: site_name = doc.find("meta", {"property": "og:site_name"}) # DEBUG: print(f"DEBUG: generator='{generator}',site_name='{site_name}'") - if isinstance(generator, bs4.element.Tag): + if isinstance(generator, bs4.element.Tag) and isinstance(generator.get("content"), str): # DEBUG: print("DEBUG: Found generator meta tag:", domain) software = tidyup.domain(generator.get("content")) - print(f"INFO: domain='{domain}' is generated by '{software}'") - instances.set_data("detection_mode", domain, "GENERATOR") - elif isinstance(site_name, bs4.element.Tag): + # DEBUG: print(f"DEBUG: software[{type(software)}]='{software}'") + if software is not None and software != "": + print(f"INFO: domain='{domain}' is generated by '{software}'") + instances.set_detection_mode(domain, "GENERATOR") + elif isinstance(site_name, bs4.element.Tag) and isinstance(site_name.get("content"), str): # DEBUG: print("DEBUG: Found property=og:site_name:", domain) - sofware = tidyup.domain(site_name.get("content")) - print(f"INFO: domain='{domain}' has og:site_name='{software}'") - instances.set_data("detection_mode", domain, "SITE_NAME") + software = tidyup.domain(site_name.get("content")) + # DEBUG: print(f"DEBUG: software[{type(software)}]='{software}'") + if software is not None and software != "": + print(f"INFO: domain='{domain}' has og:site_name='{software}'") + instances.set_detection_mode(domain, "SITE_NAME") # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if isinstance(software, str) and software == "": @@ -338,27 +431,27 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: software = None elif isinstance(software, str) and ("." in software or " " in software): # DEBUG: print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...") - software = fba.remove_version(software) + software = version.remove(software) # DEBUG: print(f"DEBUG: software[]='{type(software)}'") - if isinstance(software, str) and " powered by " in software: + if isinstance(software, str) and "powered by " in software: # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it") - software = fba.remove_version(fba.strip_powered_by(software)) + software = version.remove(version.strip_powered_by(software)) elif isinstance(software, str) and " hosted on " in software: # DEBUG: print(f"DEBUG: software='{software}' has 'hosted on' in it") - software = fba.remove_version(fba.strip_hosted_on(software)) + software = version.remove(version.strip_hosted_on(software)) elif isinstance(software, str) and " by " in software: # DEBUG: print(f"DEBUG: software='{software}' has ' by ' in it") - software = fba.strip_until(software, " by ") + software = version.strip_until(software, " by ") elif isinstance(software, str) and " see " in software: # DEBUG: print(f"DEBUG: software='{software}' has ' see ' in it") - software = fba.strip_until(software, " see ") + software = version.strip_until(software, " see ") # DEBUG: print(f"DEBUG: software='{software}' - EXIT!") return software def determine_software(domain: str, path: str = None) -> str: - # DEBUG: print(f"DEBUG: domain({len(domain)})={domain},path={path} - CALLED!") + # DEBUG: print(f"DEBUG: domain({len(domain)})='{domain}',path='{path}' - CALLED!") if not isinstance(domain, str): raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": @@ -373,38 +466,50 @@ def determine_software(domain: str, path: str = None) -> str: data = fetch_nodeinfo(domain, path) # DEBUG: print(f"DEBUG: data[{type(data)}]='{data}'") - if "error_message" in data: - # DEBUG: print("DEBUG: Could not determine software type:", domain) + if "exception" in data: + # Continue raising it + raise data["exception"] + elif "error_message" in data: + # DEBUG: print(f"DEBUG: Returned error_message during fetching nodeinfo: '{data['error_message']}',status_code='{data['status_code']}'") return fetch_generator_from_path(domain) - - # DEBUG: print("DEBUG: data():", len(data), data) - if "status" in data["json"] and data["json"]["status"] == "error" and "message" in data["json"]: - print("WARNING: JSON response is an error:", data["json"]["message"]) - instances.update_last_error(domain, data["json"]["message"]) + elif "status" in data and data["status"] == "error" and "message" in data: + print("WARNING: JSON response is an error:", data["message"]) + instances.set_last_error(domain, data["message"]) return fetch_generator_from_path(domain) - elif "message" in data["json"]: + elif "message" in data: print("WARNING: JSON response contains only a message:", data["message"]) - instances.update_last_error(domain, data["json"]["message"]) + instances.set_last_error(domain, data["message"]) return fetch_generator_from_path(domain) - elif "software" not in data["json"] or "name" not in data["json"]["software"]: + elif "software" not in data or "name" not in data["software"]: # DEBUG: print(f"DEBUG: JSON response from domain='{domain}' does not include [software][name], fetching / ...") software = fetch_generator_from_path(domain) + # DEBUG: print(f"DEBUG: Generator for domain='{domain}' is: '{software}'") + elif "software" in data and "name" in data["software"]: + # DEBUG: print("DEBUG: Found data[software][name] in JSON response") + software = data["software"]["name"] - # DEBUG: print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!") - return software - - software = tidyup.domain(data["json"]["software"]["name"]) + if software is None: + # DEBUG: print("DEBUG: Returning None - EXIT!") + return None + sofware = tidyup.domain(software) # DEBUG: print("DEBUG: sofware after tidyup.domain():", software) - if software in ["akkoma", "rebased"]: + + if software in ["akkoma", "rebased", "akkounfucked", "ched"]: # DEBUG: print("DEBUG: Setting pleroma:", domain, software) software = "pleroma" elif software in ["hometown", "ecko"]: # DEBUG: print("DEBUG: Setting mastodon:", domain, software) software = "mastodon" - elif software in ["calckey", "groundpolis", "foundkey", "cherrypick", "meisskey"]: + elif software in ["slipfox calckey", "calckey", "groundpolis", "foundkey", "cherrypick", "meisskey", "magnetar", "keybump"]: # DEBUG: print("DEBUG: Setting misskey:", domain, software) software = "misskey" + elif software == "runtube.re": + # DEBUG: print("DEBUG: Setting peertube:", domain, software) + software = "peertube" + elif software == "nextcloud social": + # DEBUG: print("DEBUG: Setting nextcloud:", domain, software) + software = "nextcloud" elif software.find("/") > 0: print("WARNING: Spliting of slash:", software) software = tidyup.domain(software.split("/")[-1]) @@ -413,13 +518,13 @@ def determine_software(domain: str, path: str = None) -> str: software = tidyup.domain(software.split("|")[0]) elif "powered by" in software: # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it") - software = fba.strip_powered_by(software) + software = version.strip_powered_by(software) elif isinstance(software, str) and " by " in software: # DEBUG: print(f"DEBUG: software='{software}' has ' by ' in it") - software = fba.strip_until(software, " by ") + software = version.strip_until(software, " by ") elif isinstance(software, str) and " see " in software: # DEBUG: print(f"DEBUG: software='{software}' has ' see ' in it") - software = fba.strip_until(software, " see ") + software = version.strip_until(software, " see ") # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if software == "": @@ -432,12 +537,12 @@ def determine_software(domain: str, path: str = None) -> str: software = fetch_generator_from_path(domain) elif len(str(software)) > 0 and ("." in software or " " in software): # DEBUG: print(f"DEBUG: software='{software}' may contain a version number, domain='{domain}', removing it ...") - software = fba.remove_version(software) + software = version.remove(software) # DEBUG: print(f"DEBUG: software[]='{type(software)}'") if isinstance(software, str) and "powered by" in software: # DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it") - software = fba.remove_version(fba.strip_powered_by(software)) + software = version.remove(version.strip_powered_by(software)) # DEBUG: print("DEBUG: Returning domain,software:", domain, software) return software @@ -462,7 +567,7 @@ def find_domains(tag: bs4.element.Tag) -> list: # DEBUG: print(f"DEBUG: domain='{domain}',reason='{reason}'") if blacklist.is_blacklisted(domain): - print(f"WARNING: domain='{domain}' is blacklisted - skipped!") + print(f"WARNING: domain='{domain}' is blacklisted - SKIPPED!") continue elif domain == "gab.com/.ai, develop.gab.com": # DEBUG: print("DEBUG: Multiple domains detected in one row") @@ -479,11 +584,11 @@ def find_domains(tag: bs4.element.Tag) -> list: "reason": reason, }) continue - elif not validators.domain(domain): - print(f"WARNING: domain='{domain}' is not a valid domain - skipped!") + elif not validators.domain(domain.split("/")[0]): + print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!") continue - # DEBUG: print(f"DEBUG: Adding domain='{domain}' ...") + # DEBUG: print(f"DEBUG: Adding domain='{domain}',reason='{reason}' ...") domains.append({ "domain": domain, "reason": reason,