import requests
import validators
+from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import cookies
from fba.helpers import domain as domain_helper
# Depth counter, being raised and lowered
_DEPTH = 0
+# API paths
+_api_paths = [
+ "/api/v1/instance/peers",
+ "/api/v3/site",
+]
+
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.debug("domain='%s',origin='%s',software='%s',command='%s',path='%s',_DEPTH=%d - CALLED!", domain, origin, software, command, path, _DEPTH)
domain_helper.raise_on(domain)
- if not isinstance(origin, str) and origin is not None:
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function was invoked")
+ elif not isinstance(origin, str) and origin is not None:
raise ValueError(f"Parameter origin[]='{type(origin)}' is not of type 'str'")
elif not isinstance(command, str):
raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
raise ValueError(f"Parameter command='{command}' but origin is None, please fix invoking this function.")
elif not isinstance(path, str) and path is not None:
raise ValueError(f"Parameter path[]='{type(path)}' is not of type 'str'")
+ elif path is not None and not path.startswith("/"):
+ raise ValueError(f"path='{path}' does not start with a slash")
elif _DEPTH > 0 and instances.is_recent(domain, "last_instance_fetch"):
raise ValueError(f"domain='{domain}' has recently been fetched but function was invoked")
elif software is None and not instances.is_recent(domain, "last_nodeinfo"):
try:
- logger.debug("Software for domain='%s' is not set, determining ...", domain)
+ logger.debug("Software for domain='%s',path='%s' is not set, determining ...", domain, path)
software = determine_software(domain, path)
except network.exceptions as exception:
logger.warning("Exception '%s' during determining software type", type(exception))
logger.info("Checking %d instance(s) from domain='%s',software='%s',depth=%d ...", len(peerlist), domain, software, _DEPTH)
for instance in peerlist:
logger.debug("instance[%s]='%s'", type(instance), instance)
- if instance is None or instance == "":
+ if instance in [None, ""]:
logger.debug("instance[%s]='%s' is either None or empty - SKIPPED!", type(instance), instance)
continue
instance = tidyup.domain(instance) if isinstance(instance, str) and instance != "" else None
logger.debug("instance='%s' - AFTER!", instance)
- if instance is None or instance == "":
+ if instance in [None, ""]:
logger.warning("instance='%s' is empty after tidyup.domain(), domain='%s'", instance, domain)
continue
elif ".." in instance:
logger.debug("domain='%s',software='%s',origin='%s' - CALLED!", domain, software, origin)
domain_helper.raise_on(domain)
- if not isinstance(software, str) and software is not None:
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function was invoked")
+ elif not isinstance(software, str) and software is not None:
raise ValueError(f"Parameter software[]='{type(software)}' is not of type 'str'")
elif isinstance(software, str) and software == "":
raise ValueError("Parameter 'software' is empty")
logger.debug("Returning empty list ... - EXIT!")
return list()
- paths = [
- "/api/v1/instance/peers",
- "/api/v3/site",
- ]
-
# Init peers variable
peers = list()
- logger.debug("Checking %d paths ...", len(paths))
- for path in paths:
+ logger.debug("Checking %d API paths ...", len(_api_paths))
+ for path in _api_paths:
logger.debug("Fetching path='%s' from domain='%s',software='%s' ...", path, domain, software)
data = network.get_json_api(
domain,
timeout=(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug("data[]='%s'", type(data))
+ logger.debug("data(%d)[]='%s'", len(data), type(data))
if "error_message" in data:
logger.debug("Was not able to fetch peers from path='%s',domain='%s' ...", path, domain)
instances.set_last_error(domain, data)
logger.debug("domain='%s',path='%s' - CALLED!", domain, path)
domain_helper.raise_on(domain)
- if not isinstance(path, str):
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function was invoked")
+ elif not isinstance(path, str):
raise ValueError(f"path[]='{type(path)}' is not of type 'str'")
elif path == "":
raise ValueError("Parameter 'path' is empty")
+ elif not path.startswith("/"):
+ raise ValueError(f"path='{path}' does not start with / but should")
software = None
logger.debug("domain='%s',path='%s' - CALLED!", domain, path)
domain_helper.raise_on(domain)
- if not isinstance(path, str) and path is not None:
+ if blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function was invoked")
+ elif not isinstance(path, str) and path is not None:
raise ValueError(f"Parameter path[]='{type(path)}' is not of type 'str'")
+ elif path is not None and not path.startswith("/"):
+ raise ValueError(f"path='{path}' does not start with a slash")
logger.debug("Fetching nodeinfo from domain='%s',path='%s' ...", domain, path)
data = nodeinfo.fetch(domain, path)
logger.debug("Generator for domain='%s' is: '%s'", domain, software)
logger.debug("software[%s]='%s'", type(software), software)
- if software is None or software == "":
+ if software in [None, ""]:
logger.debug("Returning None - EXIT!")
return None
logger.debug("Adding %d peer(s) to peers list ...", len(rows[key]))
for peer in rows[key]:
logger.debug("peer[%s]='%s' - BEFORE!", type(peer), peer)
- if peer is None or peer == "":
+ if peer in [None, ""]:
logger.debug("peer is empty - SKIPPED")
continue
elif isinstance(peer, dict) and "domain" in peer:
if not instances.is_registered(domain):
raise Exception(f"domain='{domain}' is not registered but function is invoked.")
+ elif blacklist.is_blacklisted(domain):
+ raise Exception(f"domain='{domain}' is blacklisted but function was invoked")
# Init block list
blocklist = list()
)
rows = list()
- logger.debug("data[]='%s'", type(data))
+ logger.debug("data(%d)[]='%s'", len(data), type(data))
if "error_message" in data:
logger.debug("Was not able to fetch domain_blocks from domain='%s': status_code=%d,error_message='%s'", domain, data['status_code'], data['error_message'])
instances.set_last_error(domain, data)
logger.debug("Marking domain='%s' as successfully handled ...", domain)
instances.set_success(domain)
- logger.debug("rows[%s]()=%d", type(rows), len(rows))
+ logger.debug("rows(%d)[]='%s'", len(rows), type(rows))
if len(rows) > 0:
logger.debug("Checking %d entries from domain='%s' ...", len(rows), domain)
for block in rows:
reason = tidyup.reason(block["comment"]) if "comment" in block and block["comment"] is not None and block["comment"] != "" else None
- logger.debug("Appending blocker='%s',blocked='%s',reason='%s',block_level='%s'", domain, block["domain"], reason, block["severity"])
+ logger.debug("Appending blocker='%s',blocked='%s',reason='%s',block_level='%s' ...", domain, block["domain"], reason, block["severity"])
blocklist.append({
"blocker" : domain,
"blocked" : block["domain"],