]> git.mxchange.org Git - fba.git/blobdiff - fba/fba.py
Continued:
[fba.git] / fba / fba.py
index 4ae3c51cc3a237738078f5cabb3d964484d0c373..b175e57547aa6ebcd47d221e5ab19e3643571e2e 100644 (file)
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
 import hashlib
-import re
-import json
 import sqlite3
-import time
 
 from urllib.parse import urlparse
 
 import requests
+import validators
 
-from fba import config
+from fba import blacklist
+from fba import federation
 from fba import network
 
+from fba.models import instances
+
 # Connect to database
 connection = sqlite3.connect("blocks.db")
 cursor = connection.cursor()
 
-# Pattern instance for version numbers
-patterns = [
-    # semantic version number (with v|V) prefix)
-    re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)?$"),
-    # non-sematic, e.g. 1.2.3.4
-    re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(\.(?P<subpatch>0|[1-9]\d*))?)$"),
-    # non-sematic, e.g. 2023-05[-dev]
-    re.compile("^(?P<year>[1-9]{1}[0-9]{3})\.(?P<month>[0-9]{2})(-dev){0,1}$"),
-    # non-semantic, e.g. abcdef0
-    re.compile("^[a-f0-9]{7}$"),
-]
-
 ##### Other functions #####
 
 def is_primitive(var: any) -> bool:
     # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
     return type(var) in {int, str, float, bool} or var is None
 
-def remove_version(software: str) -> str:
-    # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
-    if not "." in software and " " not in software:
-        print(f"WARNING: software='{software}' does not contain a version number.")
-        return software
-
-    temp = software
-    if ";" in software:
-        temp = software.split(";")[0]
-    elif "," in software:
-        temp = software.split(",")[0]
-    elif " - " in software:
-        temp = software.split(" - ")[0]
-
-    # DEBUG: print(f"DEBUG: software='{software}'")
-    version = None
-    if " " in software:
-        version = temp.split(" ")[-1]
-    elif "/" in software:
-        version = temp.split("/")[-1]
-    elif "-" in software:
-        version = temp.split("-")[-1]
-    else:
-        # DEBUG: print(f"DEBUG: Was not able to find common seperator, returning untouched software='{software}'")
-        return software
-
-    match = None
-    # DEBUG: print(f"DEBUG: Checking {len(patterns)} patterns ...")
-    for pattern in patterns:
-        # Run match()
-        match = pattern.match(version)
-
-        # DEBUG: print(f"DEBUG: match[]='{type(match)}'")
-        if isinstance(match, re.Match):
-            # DEBUG: print(f"DEBUG: version='{version}' is matching pattern='{pattern}'")
-            break
-
-    # DEBUG: print(f"DEBUG: version[{type(version)}]='{version}',match='{match}'")
-    if not isinstance(match, re.Match):
-        print(f"WARNING: version='{version}' does not match regex, leaving software='{software}' untouched.")
-        return software
-
-    # DEBUG: print(f"DEBUG: Found valid version number: '{version}', removing it ...")
-    end = len(temp) - len(version) - 1
-
-    # DEBUG: print(f"DEBUG: end[{type(end)}]={end}")
-    software = temp[0:end].strip()
-    if " version" in software:
-        # DEBUG: print(f"DEBUG: software='{software}' contains word ' version'")
-        software = strip_until(software, " version")
-
-    # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
-    return software
-
-def strip_powered_by(software: str) -> str:
-    # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
-    if not isinstance(software, str):
-        raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
-    elif software == "":
-        raise ValueError("Parameter 'software' is empty")
-    elif "powered by" not in software:
-        print(f"WARNING: Cannot find 'powered by' in software='{software}'!")
-        return software
-
-    start = software.find("powered by ")
-    # DEBUG: print(f"DEBUG: start[{type(start)}]='{start}'")
-
-    software = software[start + 11:].strip()
-    # DEBUG: print(f"DEBUG: software='{software}'")
-
-    software = strip_until(software, " - ")
-
-    # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
-    return software
-
-def strip_hosted_on(software: str) -> str:
-    # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
-    if not isinstance(software, str):
-        raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
-    elif software == "":
-        raise ValueError("Parameter 'software' is empty")
-    elif "hosted on" not in software:
-        print(f"WARNING: Cannot find 'hosted on' in '{software}'!")
-        return software
-
-    end = software.find("hosted on ")
-    # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
-
-    software = software[0, end].strip()
-    # DEBUG: print(f"DEBUG: software='{software}'")
-
-    software = strip_until(software, " - ")
-
-    # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
-    return software
-
-def strip_until(software: str, until: str) -> str:
-    # DEBUG: print(f"DEBUG: software='{software}',until='{until}' - CALLED!")
-    if not isinstance(software, str):
-        raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
-    elif software == "":
-        raise ValueError("Parameter 'software' is empty")
-    elif not isinstance(until, str):
-        raise ValueError(f"Parameter until[]='{type(until)}' is not 'str'")
-    elif until == "":
-        raise ValueError("Parameter 'until' is empty")
-    elif not until in software:
-        print(f"WARNING: Cannot find '{until}' in '{software}'!")
-        return software
-
-    # Next, strip until part
-    end = software.find(until)
-
-    # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
-    if end > 0:
-        software = software[0:end].strip()
-
-    # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
-    return software
-
 def get_hash(domain: str) -> str:
     if not isinstance(domain, str):
         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
@@ -176,41 +45,6 @@ def get_hash(domain: str) -> str:
 
     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
 
-def log_error(domain: str, error: dict):
-    # DEBUG: print("DEBUG: domain,error[]:", domain, type(error))
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif config.get("write_error_log").lower() != "true":
-        # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
-        return
-
-    # DEBUG: print("DEBUG: BEFORE error[]:", type(error))
-    if isinstance(error, BaseException) or isinstance(error, json.decoder.JSONDecodeError):
-        error = f"error[{type(error)}]='{str(error)}'"
-
-    # DEBUG: print("DEBUG: AFTER error[]:", type(error))
-    if isinstance(error, str):
-        cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
-            domain,
-            error,
-            time.time()
-        ])
-    else:
-        cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
-            domain,
-            error["status_code"],
-            error["error_message"],
-            time.time()
-        ])
-
-    # Cleanup old entries
-    # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
-    cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
-
-    # DEBUG: print("DEBUG: EXIT!")
-
 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
     # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
     if not isinstance(url, str):
@@ -228,9 +62,72 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon
     # Invoke other function, avoid trailing ?
     # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
     if components.query != "":
-        response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout)
+        response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
     else:
-        response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout)
+        response = network.fetch_response(components.netloc, f"{components.path}", headers, timeout)
 
     # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
     return response
+
+def process_domain(domain: str, blocker: str, command: str) -> bool:
+    # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
+    if not isinstance(domain, str):
+        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+    elif domain == "":
+        raise ValueError("Parameter 'domain' is empty")
+    elif not isinstance(blocker, str):
+        raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
+    elif blocker == "":
+        raise ValueError("Parameter 'blocker' is empty")
+    elif not isinstance(command, str):
+        raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
+    elif command == "":
+        raise ValueError("Parameter 'command' is empty")
+
+    if domain.find("*") > 0:
+        # Try to de-obscure it
+        row = instances.deobscure("*", domain)
+
+        # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
+        if row is None:
+            print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
+            return False
+
+        # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
+        domain = row[0]
+    elif domain.find("?") > 0:
+        # Try to de-obscure it
+        row = instances.deobscure("?", domain)
+
+        # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
+        if row is None:
+            print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
+            return False
+
+        # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
+        domain = row[0]
+
+    if not validators.domain(domain):
+        print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!")
+        return False
+    elif domain.endswith(".arpa"):
+        print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
+        return False
+    elif blacklist.is_blacklisted(domain):
+        # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
+        return False
+    elif instances.is_recent(domain):
+        # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - SKIPPED!")
+        return False
+
+    processed = False
+    try:
+        print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
+        federation.fetch_instances(domain, blocker, None, command)
+        processed = True
+    except network.exceptions as exception:
+        print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
+        instances.set_last_error(domain, exception)
+
+    # DEBUG: print(f"DEBUG: processed='{processed}' - EXIT!")
+    return processed