instances.set_total_blocks(blocker, blocking)
blockdict = []
+
+ logger.info("Checking %d block lists ...", len(blocklist))
for block_level in blocklist:
logger.debug("Checking %d blocker entries for block_level='%s' ...", len(blocklist[block_level]), block_level)
for blocked in blocklist[block_level]:
logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)
+# "Cache" configuration get() invocations
+_allow_i2p_domain = config.get("allow_i2p_domain")
+
def raise_on(domain: str) -> None:
logger.debug("domain='%s' - CALLED!", domain)
raise ValueError(f"domain='{domain}' is not a valid domain")
elif domain.endswith(".onion"):
raise ValueError(f"domain='{domain}' is a TOR, please don't crawl them!")
- elif domain.endswith(".i2p") and not config.get("allow_i2p_domain"):
+ elif domain.endswith(".i2p") and not _allow_i2p_domain:
raise ValueError(f"domain='{domain}' is an I2P, please don't crawl them!")
elif domain.endswith(".arpa"):
raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
if domain.endswith(".onion"):
logger.debug("domain='%s' is a TOR .onion domain - setting wanted=False ...", domain)
wanted = False
- elif domain.endswith(".i2p") and not config.get("allow_i2p_domain"):
+ elif domain.endswith(".i2p") and not _allow_i2p_domain:
logger.debug("domain='%s' is an I2P .onion domain - setting wanted=False ...", domain)
wanted = False
elif domain.endswith(".arpa"):
logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)
+# "Cache" configuration get() invocations
+_bot_enabled = config.get("bot_enabled")
+
def instance(blocked: str, blocker: str, command: str, force: bool = False) -> bool:
logger.debug("blocked='%s',blocker='%s',command='%s',force='%s' - CALLED!", blocked, blocker, command, force)
domain_helper.raise_on(blocked)
processed = instance(domain, blocker, command)
logger.debug("processed='%s'", processed)
- if block(blocker, domain, reason, severity) and config.get("bot_enabled"):
+ if block(blocker, domain, reason, severity) and _bot_enabled:
logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", domain, reason, blocker)
blockdict.append({
"blocked": domain,
logger.debug("Flushing updates for blocker='%s' ...", blocker)
instances.update(blocker)
- logger.debug("config.get(bot_enabled)='%s',blockdict()=%d", config.get("bot_enabled"), len(blockdict))
- if config.get("bot_enabled") and len(blockdict) > 0:
+ logger.debug("config.get(bot_enabled)='%s',blockdict()=%d", _bot_enabled, len(blockdict))
+ if _bot_enabled and len(blockdict) > 0:
logger.info("Sending bot POST for blocker='%s',blockdict()=%d ...", blocker, len(blockdict))
network.send_bot_post(blocker, blockdict)
#logger.setLevel(logging.DEBUG)
# "Cached" configuration values
-_write_error_log = config.get("write_error_log")
+_write_error_log = config.get("write_error_log")
+_error_log_cleanup = config.get("error_log_cleanup")
def add(domain: str, error: dict) -> None:
logger.debug("domain='%s',error[]='%s' - CALLED!", domain, type(error))
])
# Cleanup old entries
- logger.debug("Purging old records (distance: %d)", config.get('error_log_cleanup'))
- database.cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
+ logger.debug("Purging old records (distance: %d)", _error_log_cleanup)
+ database.cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - _error_log_cleanup])
logger.debug("EXIT!")