]> git.mxchange.org Git - fba.git/blobdiff - fba/fba.py
Continued:
[fba.git] / fba / fba.py
index 1dada8cbf21ab8c0607b4eed5554b0b52b7e26a5..b175e57547aa6ebcd47d221e5ab19e3643571e2e 100644 (file)
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
 import hashlib
-import re
-import json
 import sqlite3
-import sys
-import time
 
 from urllib.parse import urlparse
 
 import requests
+import validators
 
-from fba import config
+from fba import blacklist
+from fba import federation
 from fba import network
 
-# Array with pending errors needed to be written to database
-pending_errors = {
-}
+from fba.models import instances
 
 # Connect to database
 connection = sqlite3.connect("blocks.db")
 cursor = connection.cursor()
 
-# Pattern instance for version numbers
-patterns = [
-    # semantic version number (with v|V) prefix)
-    re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)?$"),
-    # non-sematic, e.g. 1.2.3.4
-    re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(\.(?P<subpatch>0|[1-9]\d*))?)$"),
-    # non-sematic, e.g. 2023-05[-dev]
-    re.compile("^(?P<year>[1-9]{1}[0-9]{3})\.(?P<month>[0-9]{2})(-dev){0,1}$"),
-    # non-semantic, e.g. abcdef0
-    re.compile("^[a-f0-9]{7}$"),
-]
-
 ##### Other functions #####
 
 def is_primitive(var: any) -> bool:
     # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
     return type(var) in {int, str, float, bool} or var is None
 
-def remove_version(software: str) -> str:
-    # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
-    if not "." in software and " " not in software:
-        print(f"WARNING: software='{software}' does not contain a version number.")
-        return software
-
-    temp = software
-    if ";" in software:
-        temp = software.split(";")[0]
-    elif "," in software:
-        temp = software.split(",")[0]
-    elif " - " in software:
-        temp = software.split(" - ")[0]
-
-    # DEBUG: print(f"DEBUG: software='{software}'")
-    version = None
-    if " " in software:
-        version = temp.split(" ")[-1]
-    elif "/" in software:
-        version = temp.split("/")[-1]
-    elif "-" in software:
-        version = temp.split("-")[-1]
-    else:
-        # DEBUG: print(f"DEBUG: Was not able to find common seperator, returning untouched software='{software}'")
-        return software
-
-    match = None
-    # DEBUG: print(f"DEBUG: Checking {len(patterns)} patterns ...")
-    for pattern in patterns:
-        # Run match()
-        match = pattern.match(version)
-
-        # DEBUG: print(f"DEBUG: match[]={type(match)}")
-        if isinstance(match, re.Match):
-            # DEBUG: print(f"DEBUG: version='{version}' is matching pattern='{pattern}'")
-            break
-
-    # DEBUG: print(f"DEBUG: version[{type(version)}]='{version}',match='{match}'")
-    if not isinstance(match, re.Match):
-        print(f"WARNING: version='{version}' does not match regex, leaving software='{software}' untouched.")
-        return software
-
-    # DEBUG: print(f"DEBUG: Found valid version number: '{version}', removing it ...")
-    end = len(temp) - len(version) - 1
-
-    # DEBUG: print(f"DEBUG: end[{type(end)}]={end}")
-    software = temp[0:end].strip()
-    if " version" in software:
-        # DEBUG: print(f"DEBUG: software='{software}' contains word ' version'")
-        software = strip_until(software, " version")
-
-    # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
-    return software
-
-def strip_powered_by(software: str) -> str:
-    # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
-    if not isinstance(software, str):
-        raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
-    elif software == "":
-        raise ValueError("Parameter 'software' is empty")
-    elif "powered by" not in software:
-        print(f"WARNING: Cannot find 'powered by' in software='{software}'!")
-        return software
-
-    start = software.find("powered by ")
-    # DEBUG: print(f"DEBUG: start[{type(start)}]='{start}'")
-
-    software = software[start + 11:].strip()
-    # DEBUG: print(f"DEBUG: software='{software}'")
-
-    software = strip_until(software, " - ")
-
-    # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
-    return software
-
-def strip_hosted_on(software: str) -> str:
-    # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
-    if not isinstance(software, str):
-        raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
-    elif software == "":
-        raise ValueError("Parameter 'software' is empty")
-    elif "hosted on" not in software:
-        print(f"WARNING: Cannot find 'hosted on' in '{software}'!")
-        return software
-
-    end = software.find("hosted on ")
-    # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
-
-    software = software[0, end].strip()
-    # DEBUG: print(f"DEBUG: software='{software}'")
-
-    software = strip_until(software, " - ")
-
-    # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
-    return software
-
-def strip_until(software: str, until: str) -> str:
-    # DEBUG: print(f"DEBUG: software='{software}',until='{until}' - CALLED!")
-    if not isinstance(software, str):
-        raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
-    elif software == "":
-        raise ValueError("Parameter 'software' is empty")
-    elif not isinstance(until, str):
-        raise ValueError(f"Parameter until[]='{type(until)}' is not 'str'")
-    elif until == "":
-        raise ValueError("Parameter 'until' is empty")
-    elif not until in software:
-        print(f"WARNING: Cannot find '{until}' in '{software}'!")
-        return software
-
-    # Next, strip until part
-    end = software.find(until)
-
-    # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
-    if end > 0:
-        software = software[0:end].strip()
-
-    # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
-    return software
-
-def remove_pending_error(domain: str):
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-
-    try:
-        # Prevent updating any pending errors, nodeinfo was found
-        del pending_errors[domain]
-
-    except:
-        pass
-
-    # DEBUG: print("DEBUG: EXIT!")
-
 def get_hash(domain: str) -> str:
     if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
+        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
     elif domain == "":
         raise ValueError("Parameter 'domain' is empty")
 
     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
 
-def log_error(domain: str, response: requests.models.Response):
-    # DEBUG: print("DEBUG: domain,response[]:", domain, type(response))
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif config.get("write_error_log").lower() != "true":
-        # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
-        return
-
-    try:
-        # DEBUG: print("DEBUG: BEFORE response[]:", type(response))
-        if isinstance(response, BaseException) or isinstance(response, json.decoder.JSONDecodeError):
-            response = f"response[{type(response)}]='{str(response)}'"
-
-        # DEBUG: print("DEBUG: AFTER response[]:", type(response))
-        if isinstance(response, str):
-            cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
-                domain,
-                response,
-                time.time()
-            ])
-        else:
-            cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
-                domain,
-                response.status_code,
-                response.reason,
-                time.time()
-            ])
-
-        # Cleanup old entries
-        # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
-        cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
-    except BaseException as exception:
-        print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
-        sys.exit(255)
-
-    # DEBUG: print("DEBUG: EXIT!")
-
 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
     # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
     if not isinstance(url, str):
@@ -252,9 +62,72 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon
     # Invoke other function, avoid trailing ?
     # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
     if components.query != "":
-        response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout)
+        response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
     else:
-        response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout)
+        response = network.fetch_response(components.netloc, f"{components.path}", headers, timeout)
 
     # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
     return response
+
+def process_domain(domain: str, blocker: str, command: str) -> bool:
+    # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
+    if not isinstance(domain, str):
+        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+    elif domain == "":
+        raise ValueError("Parameter 'domain' is empty")
+    elif not isinstance(blocker, str):
+        raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
+    elif blocker == "":
+        raise ValueError("Parameter 'blocker' is empty")
+    elif not isinstance(command, str):
+        raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
+    elif command == "":
+        raise ValueError("Parameter 'command' is empty")
+
+    if domain.find("*") > 0:
+        # Try to de-obscure it
+        row = instances.deobscure("*", domain)
+
+        # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
+        if row is None:
+            print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
+            return False
+
+        # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
+        domain = row[0]
+    elif domain.find("?") > 0:
+        # Try to de-obscure it
+        row = instances.deobscure("?", domain)
+
+        # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
+        if row is None:
+            print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
+            return False
+
+        # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
+        domain = row[0]
+
+    if not validators.domain(domain):
+        print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!")
+        return False
+    elif domain.endswith(".arpa"):
+        print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
+        return False
+    elif blacklist.is_blacklisted(domain):
+        # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
+        return False
+    elif instances.is_recent(domain):
+        # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - SKIPPED!")
+        return False
+
+    processed = False
+    try:
+        print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
+        federation.fetch_instances(domain, blocker, None, command)
+        processed = True
+    except network.exceptions as exception:
+        print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
+        instances.set_last_error(domain, exception)
+
+    # DEBUG: print(f"DEBUG: processed='{processed}' - EXIT!")
+    return processed