X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Ffba.py;h=b175e57547aa6ebcd47d221e5ab19e3643571e2e;hb=bb3f9e35709b79b790b949ea65ac0a323957d9b5;hp=9ea0db030060da51c053f646d61089867deb3e08;hpb=f71083f2182908ee7bd6e8c24bdeeb3778cb9b41;p=fba.git diff --git a/fba/fba.py b/fba/fba.py index 9ea0db0..b175e57 100644 --- a/fba/fba.py +++ b/fba/fba.py @@ -14,203 +14,37 @@ # along with this program. If not, see . import hashlib -import re -import json import sqlite3 -import time from urllib.parse import urlparse import requests +import validators -from fba import config +from fba import blacklist +from fba import federation from fba import network +from fba.models import instances + # Connect to database connection = sqlite3.connect("blocks.db") cursor = connection.cursor() -# Pattern instance for version numbers -patterns = [ - # semantic version number (with v|V) prefix) - re.compile("^(?Pv|V{0,1})(\.{0,1})(?P0|[1-9]\d*)\.(?P0+|[1-9]\d*)(\.(?P0+|[1-9]\d*)(?:-(?P(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)?$"), - # non-sematic, e.g. 1.2.3.4 - re.compile("^(?Pv|V{0,1})(\.{0,1})(?P0|[1-9]\d*)\.(?P0+|[1-9]\d*)(\.(?P0+|[1-9]\d*)(\.(?P0|[1-9]\d*))?)$"), - # non-sematic, e.g. 2023-05[-dev] - re.compile("^(?P[1-9]{1}[0-9]{3})\.(?P[0-9]{2})(-dev){0,1}$"), - # non-semantic, e.g. abcdef0 - re.compile("^[a-f0-9]{7}$"), -] - ##### Other functions ##### def is_primitive(var: any) -> bool: # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!") return type(var) in {int, str, float, bool} or var is None -def remove_version(software: str) -> str: - # DEBUG: print(f"DEBUG: software='{software}' - CALLED!") - if not "." in software and " " not in software: - print(f"WARNING: software='{software}' does not contain a version number.") - return software - - temp = software - if ";" in software: - temp = software.split(";")[0] - elif "," in software: - temp = software.split(",")[0] - elif " - " in software: - temp = software.split(" - ")[0] - - # DEBUG: print(f"DEBUG: software='{software}'") - version = None - if " " in software: - version = temp.split(" ")[-1] - elif "/" in software: - version = temp.split("/")[-1] - elif "-" in software: - version = temp.split("-")[-1] - else: - # DEBUG: print(f"DEBUG: Was not able to find common seperator, returning untouched software='{software}'") - return software - - match = None - # DEBUG: print(f"DEBUG: Checking {len(patterns)} patterns ...") - for pattern in patterns: - # Run match() - match = pattern.match(version) - - # DEBUG: print(f"DEBUG: match[]={type(match)}") - if isinstance(match, re.Match): - # DEBUG: print(f"DEBUG: version='{version}' is matching pattern='{pattern}'") - break - - # DEBUG: print(f"DEBUG: version[{type(version)}]='{version}',match='{match}'") - if not isinstance(match, re.Match): - print(f"WARNING: version='{version}' does not match regex, leaving software='{software}' untouched.") - return software - - # DEBUG: print(f"DEBUG: Found valid version number: '{version}', removing it ...") - end = len(temp) - len(version) - 1 - - # DEBUG: print(f"DEBUG: end[{type(end)}]={end}") - software = temp[0:end].strip() - if " version" in software: - # DEBUG: print(f"DEBUG: software='{software}' contains word ' version'") - software = strip_until(software, " version") - - # DEBUG: print(f"DEBUG: software='{software}' - EXIT!") - return software - -def strip_powered_by(software: str) -> str: - # DEBUG: print(f"DEBUG: software='{software}' - CALLED!") - if not isinstance(software, str): - raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'") - elif software == "": - raise ValueError("Parameter 'software' is empty") - elif "powered by" not in software: - print(f"WARNING: Cannot find 'powered by' in software='{software}'!") - return software - - start = software.find("powered by ") - # DEBUG: print(f"DEBUG: start[{type(start)}]='{start}'") - - software = software[start + 11:].strip() - # DEBUG: print(f"DEBUG: software='{software}'") - - software = strip_until(software, " - ") - - # DEBUG: print(f"DEBUG: software='{software}' - EXIT!") - return software - -def strip_hosted_on(software: str) -> str: - # DEBUG: print(f"DEBUG: software='{software}' - CALLED!") - if not isinstance(software, str): - raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'") - elif software == "": - raise ValueError("Parameter 'software' is empty") - elif "hosted on" not in software: - print(f"WARNING: Cannot find 'hosted on' in '{software}'!") - return software - - end = software.find("hosted on ") - # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'") - - software = software[0, end].strip() - # DEBUG: print(f"DEBUG: software='{software}'") - - software = strip_until(software, " - ") - - # DEBUG: print(f"DEBUG: software='{software}' - EXIT!") - return software - -def strip_until(software: str, until: str) -> str: - # DEBUG: print(f"DEBUG: software='{software}',until='{until}' - CALLED!") - if not isinstance(software, str): - raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'") - elif software == "": - raise ValueError("Parameter 'software' is empty") - elif not isinstance(until, str): - raise ValueError(f"Parameter until[]='{type(until)}' is not 'str'") - elif until == "": - raise ValueError("Parameter 'until' is empty") - elif not until in software: - print(f"WARNING: Cannot find '{until}' in '{software}'!") - return software - - # Next, strip until part - end = software.find(until) - - # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'") - if end > 0: - software = software[0:end].strip() - - # DEBUG: print(f"DEBUG: software='{software}' - EXIT!") - return software - def get_hash(domain: str) -> str: if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") return hashlib.sha256(domain.encode("utf-8")).hexdigest() -def log_error(domain: str, error: dict): - # DEBUG: print("DEBUG: domain,error[]:", domain, type(error)) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif config.get("write_error_log").lower() != "true": - # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!") - return - - # DEBUG: print("DEBUG: BEFORE error[]:", type(error)) - if isinstance(error, BaseException) or isinstance(error, json.decoder.JSONDecodeError): - error = f"error[{type(error)}]='{str(error)}'" - - # DEBUG: print("DEBUG: AFTER error[]:", type(error)) - if isinstance(error, str): - cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[ - domain, - error, - time.time() - ]) - else: - cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[ - domain, - error["status_code"], - error["error_message"], - time.time() - ]) - - # Cleanup old entries - # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})") - cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")]) - - # DEBUG: print("DEBUG: EXIT!") - def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response: # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!") if not isinstance(url, str): @@ -228,9 +62,72 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon # Invoke other function, avoid trailing ? # DEBUG: print(f"DEBUG: components[{type(components)}]={components}") if components.query != "": - response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout) + response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout) else: - response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout) + response = network.fetch_response(components.netloc, f"{components.path}", headers, timeout) # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!") return response + +def process_domain(domain: str, blocker: str, command: str) -> bool: + # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!") + if not isinstance(domain, str): + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") + elif domain == "": + raise ValueError("Parameter 'domain' is empty") + elif not isinstance(blocker, str): + raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'") + elif blocker == "": + raise ValueError("Parameter 'blocker' is empty") + elif not isinstance(command, str): + raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'") + elif command == "": + raise ValueError("Parameter 'command' is empty") + + if domain.find("*") > 0: + # Try to de-obscure it + row = instances.deobscure("*", domain) + + # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'") + if row is None: + print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!") + return False + + # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'") + domain = row[0] + elif domain.find("?") > 0: + # Try to de-obscure it + row = instances.deobscure("?", domain) + + # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'") + if row is None: + print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!") + return False + + # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'") + domain = row[0] + + if not validators.domain(domain): + print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!") + return False + elif domain.endswith(".arpa"): + print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.") + return False + elif blacklist.is_blacklisted(domain): + # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!") + return False + elif instances.is_recent(domain): + # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - SKIPPED!") + return False + + processed = False + try: + print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...") + federation.fetch_instances(domain, blocker, None, command) + processed = True + except network.exceptions as exception: + print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'") + instances.set_last_error(domain, exception) + + # DEBUG: print(f"DEBUG: processed='{processed}' - EXIT!") + return processed