X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=fba%2Ffba.py;h=b175e57547aa6ebcd47d221e5ab19e3643571e2e;hb=bb3f9e35709b79b790b949ea65ac0a323957d9b5;hp=9ea0db030060da51c053f646d61089867deb3e08;hpb=f71083f2182908ee7bd6e8c24bdeeb3778cb9b41;p=fba.git
diff --git a/fba/fba.py b/fba/fba.py
index 9ea0db0..b175e57 100644
--- a/fba/fba.py
+++ b/fba/fba.py
@@ -14,203 +14,37 @@
# along with this program. If not, see .
import hashlib
-import re
-import json
import sqlite3
-import time
from urllib.parse import urlparse
import requests
+import validators
-from fba import config
+from fba import blacklist
+from fba import federation
from fba import network
+from fba.models import instances
+
# Connect to database
connection = sqlite3.connect("blocks.db")
cursor = connection.cursor()
-# Pattern instance for version numbers
-patterns = [
- # semantic version number (with v|V) prefix)
- re.compile("^(?Pv|V{0,1})(\.{0,1})(?P0|[1-9]\d*)\.(?P0+|[1-9]\d*)(\.(?P0+|[1-9]\d*)(?:-(?P(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)?$"),
- # non-sematic, e.g. 1.2.3.4
- re.compile("^(?Pv|V{0,1})(\.{0,1})(?P0|[1-9]\d*)\.(?P0+|[1-9]\d*)(\.(?P0+|[1-9]\d*)(\.(?P0|[1-9]\d*))?)$"),
- # non-sematic, e.g. 2023-05[-dev]
- re.compile("^(?P[1-9]{1}[0-9]{3})\.(?P[0-9]{2})(-dev){0,1}$"),
- # non-semantic, e.g. abcdef0
- re.compile("^[a-f0-9]{7}$"),
-]
-
##### Other functions #####
def is_primitive(var: any) -> bool:
# DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
return type(var) in {int, str, float, bool} or var is None
-def remove_version(software: str) -> str:
- # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
- if not "." in software and " " not in software:
- print(f"WARNING: software='{software}' does not contain a version number.")
- return software
-
- temp = software
- if ";" in software:
- temp = software.split(";")[0]
- elif "," in software:
- temp = software.split(",")[0]
- elif " - " in software:
- temp = software.split(" - ")[0]
-
- # DEBUG: print(f"DEBUG: software='{software}'")
- version = None
- if " " in software:
- version = temp.split(" ")[-1]
- elif "/" in software:
- version = temp.split("/")[-1]
- elif "-" in software:
- version = temp.split("-")[-1]
- else:
- # DEBUG: print(f"DEBUG: Was not able to find common seperator, returning untouched software='{software}'")
- return software
-
- match = None
- # DEBUG: print(f"DEBUG: Checking {len(patterns)} patterns ...")
- for pattern in patterns:
- # Run match()
- match = pattern.match(version)
-
- # DEBUG: print(f"DEBUG: match[]={type(match)}")
- if isinstance(match, re.Match):
- # DEBUG: print(f"DEBUG: version='{version}' is matching pattern='{pattern}'")
- break
-
- # DEBUG: print(f"DEBUG: version[{type(version)}]='{version}',match='{match}'")
- if not isinstance(match, re.Match):
- print(f"WARNING: version='{version}' does not match regex, leaving software='{software}' untouched.")
- return software
-
- # DEBUG: print(f"DEBUG: Found valid version number: '{version}', removing it ...")
- end = len(temp) - len(version) - 1
-
- # DEBUG: print(f"DEBUG: end[{type(end)}]={end}")
- software = temp[0:end].strip()
- if " version" in software:
- # DEBUG: print(f"DEBUG: software='{software}' contains word ' version'")
- software = strip_until(software, " version")
-
- # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
- return software
-
-def strip_powered_by(software: str) -> str:
- # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
- if not isinstance(software, str):
- raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
- elif software == "":
- raise ValueError("Parameter 'software' is empty")
- elif "powered by" not in software:
- print(f"WARNING: Cannot find 'powered by' in software='{software}'!")
- return software
-
- start = software.find("powered by ")
- # DEBUG: print(f"DEBUG: start[{type(start)}]='{start}'")
-
- software = software[start + 11:].strip()
- # DEBUG: print(f"DEBUG: software='{software}'")
-
- software = strip_until(software, " - ")
-
- # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
- return software
-
-def strip_hosted_on(software: str) -> str:
- # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
- if not isinstance(software, str):
- raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
- elif software == "":
- raise ValueError("Parameter 'software' is empty")
- elif "hosted on" not in software:
- print(f"WARNING: Cannot find 'hosted on' in '{software}'!")
- return software
-
- end = software.find("hosted on ")
- # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
-
- software = software[0, end].strip()
- # DEBUG: print(f"DEBUG: software='{software}'")
-
- software = strip_until(software, " - ")
-
- # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
- return software
-
-def strip_until(software: str, until: str) -> str:
- # DEBUG: print(f"DEBUG: software='{software}',until='{until}' - CALLED!")
- if not isinstance(software, str):
- raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
- elif software == "":
- raise ValueError("Parameter 'software' is empty")
- elif not isinstance(until, str):
- raise ValueError(f"Parameter until[]='{type(until)}' is not 'str'")
- elif until == "":
- raise ValueError("Parameter 'until' is empty")
- elif not until in software:
- print(f"WARNING: Cannot find '{until}' in '{software}'!")
- return software
-
- # Next, strip until part
- end = software.find(until)
-
- # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
- if end > 0:
- software = software[0:end].strip()
-
- # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
- return software
-
def get_hash(domain: str) -> str:
if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
return hashlib.sha256(domain.encode("utf-8")).hexdigest()
-def log_error(domain: str, error: dict):
- # DEBUG: print("DEBUG: domain,error[]:", domain, type(error))
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif config.get("write_error_log").lower() != "true":
- # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
- return
-
- # DEBUG: print("DEBUG: BEFORE error[]:", type(error))
- if isinstance(error, BaseException) or isinstance(error, json.decoder.JSONDecodeError):
- error = f"error[{type(error)}]='{str(error)}'"
-
- # DEBUG: print("DEBUG: AFTER error[]:", type(error))
- if isinstance(error, str):
- cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
- domain,
- error,
- time.time()
- ])
- else:
- cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
- domain,
- error["status_code"],
- error["error_message"],
- time.time()
- ])
-
- # Cleanup old entries
- # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
- cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
-
- # DEBUG: print("DEBUG: EXIT!")
-
def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
# DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
if not isinstance(url, str):
@@ -228,9 +62,72 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon
# Invoke other function, avoid trailing ?
# DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
if components.query != "":
- response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout)
+ response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
else:
- response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout)
+ response = network.fetch_response(components.netloc, f"{components.path}", headers, timeout)
# DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
return response
+
+def process_domain(domain: str, blocker: str, command: str) -> bool:
+ # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif not isinstance(blocker, str):
+ raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
+ elif blocker == "":
+ raise ValueError("Parameter 'blocker' is empty")
+ elif not isinstance(command, str):
+ raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
+ elif command == "":
+ raise ValueError("Parameter 'command' is empty")
+
+ if domain.find("*") > 0:
+ # Try to de-obscure it
+ row = instances.deobscure("*", domain)
+
+ # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
+ if row is None:
+ print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
+ return False
+
+ # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
+ domain = row[0]
+ elif domain.find("?") > 0:
+ # Try to de-obscure it
+ row = instances.deobscure("?", domain)
+
+ # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
+ if row is None:
+ print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
+ return False
+
+ # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
+ domain = row[0]
+
+ if not validators.domain(domain):
+ print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!")
+ return False
+ elif domain.endswith(".arpa"):
+ print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
+ return False
+ elif blacklist.is_blacklisted(domain):
+ # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
+ return False
+ elif instances.is_recent(domain):
+ # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - SKIPPED!")
+ return False
+
+ processed = False
+ try:
+ print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
+ federation.fetch_instances(domain, blocker, None, command)
+ processed = True
+ except network.exceptions as exception:
+ print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
+ instances.set_last_error(domain, exception)
+
+ # DEBUG: print(f"DEBUG: processed='{processed}' - EXIT!")
+ return processed