logger.info("Checking %d entries from blocker='%s',software='%s' ...", len(blocking), blocker, software)
for block in blocking:
- logger.debug("blocked='%s',block_level='%s',reason='%s'", block['blocked'], block['block_level'], block['reason'])
+ logger.debug("blocked='%s',block_level='%s',reason='%s'", block["blocked"], block['block_level'], block["reason"])
if block['block_level'] == "":
- logger.warning("block_level is empty, blocker='%s',blocked='%s'", block['blocker'], block['blocked'])
+ logger.warning("block_level is empty, blocker='%s',blocked='%s'", block["blocker"], block["blocked"])
continue
- logger.debug("blocked='%s',reason='%s' - BEFORE!", block['blocked'], block['reason'])
- block['blocked'] = tidyup.domain(block['blocked'])
- block['reason'] = tidyup.reason(block['reason']) if block['reason'] is not None and block['reason'] != "" else None
- logger.debug("blocked='%s',reason='%s' - AFTER!", block['blocked'], block['reason'])
+ logger.debug("blocked='%s',reason='%s' - BEFORE!", block["blocked"], block["reason"])
+ block["blocked"] = tidyup.domain(block["blocked"])
+ block["reason"] = tidyup.reason(block["reason"]) if block["reason"] is not None and block["reason"] != "" else None
+ logger.debug("blocked='%s',reason='%s' - AFTER!", block["blocked"], block["reason"])
- if block['blocked'] == "":
+ if block["blocked"] == "":
logger.warning("blocked is empty, blocker='%s'", blocker)
continue
- elif block['blocked'].count("*") > 0:
+ elif block["blocked"].count("*") > 0:
logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
instances.set_has_obfuscation(blocker, True)
# Some friendica servers also obscure domains without hash
- row = instances.deobfuscate("*", block['blocked'])
+ row = instances.deobfuscate("*", block["blocked"], block["hash"] if "hash" in block else None)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfuscate blocked='%s',blocker='%s',software='%s' - SKIPPED!", block['blocked'], blocker, software)
+ logger.warning("Cannot deobfuscate blocked='%s',blocker='%s',software='%s' - SKIPPED!", block["blocked"], blocker, software)
continue
- block['blocked'] = row[0]
+ block["blocked"] = row[0]
origin = row[1]
nodeinfo_url = row[2]
- elif block['blocked'].count("?") > 0:
+ elif block["blocked"].count("?") > 0:
logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
instances.set_has_obfuscation(blocker, True)
# Some obscure them with question marks, not sure if that's dependent on version or not
- row = instances.deobfuscate("?", block['blocked'])
+ row = instances.deobfuscate("?", block["blocked"] if "hash" in block else None)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning("Cannot deobfuscate blocked='%s',blocker='%s',software='%s' - SKIPPED!", block['blocked'], blocker, software)
+ logger.warning("Cannot deobfuscate blocked='%s',blocker='%s',software='%s' - SKIPPED!", block["blocked"], blocker, software)
continue
- block['blocked'] = row[0]
+ block["blocked"] = row[0]
origin = row[1]
nodeinfo_url = row[2]
- logger.debug("Looking up instance by domainm, blocked='%s'", block['blocked'])
- if not utils.is_domain_wanted(block['blocked']):
- logger.debug("blocked='%s' is not wanted - SKIPPED!", block['blocked'])
+ logger.debug("Looking up instance by domainm, blocked='%s'", block["blocked"])
+ if not utils.is_domain_wanted(block["blocked"]):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"])
continue
elif block['block_level'] in ["accept", "accepted"]:
- logger.debug("blocked='%s' is accepted, not wanted here - SKIPPED!", block['blocked'])
+ logger.debug("blocked='%s' is accepted, not wanted here - SKIPPED!", block["blocked"])
continue
- elif not instances.is_registered(block['blocked']):
- logger.debug("Hash wasn't found, adding: blocked='%s',blocker='%s'", block['blocked'], blocker)
- federation.fetch_instances(block['blocked'], blocker, None, inspect.currentframe().f_code.co_name)
+ elif not instances.is_registered(block["blocked"]):
+ logger.debug("Hash wasn't found, adding: blocked='%s',blocker='%s'", block["blocked"], blocker)
+ federation.fetch_instances(block["blocked"], blocker, None, inspect.currentframe().f_code.co_name)
if block['block_level'] == "silence":
logger.debug("Block level 'silence' has been changed to 'silenced'")
logger.debug("Block level 'suspend' has been changed to 'suspended'")
block['block_level'] = "suspended"
- if not blocks.is_instance_blocked(blocker, block['blocked'], block['block_level']):
- logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s)", blocker, block['blocked'], block['reason'], block['block_level'])
- blocks.add_instance(blocker, block['blocked'], block['reason'], block['block_level'])
+ if not blocks.is_instance_blocked(blocker, block["blocked"], block['block_level']):
+ logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s)", blocker, block["blocked"], block["reason"], block['block_level'])
+ blocks.add_instance(blocker, block["blocked"], block["reason"], block['block_level'])
logger.debug("block_level='%s',config[bot_enabled]=%s", block['block_level'], config.get("bot_enabled"))
if block['block_level'] == "reject" and config.get("bot_enabled"):
- logger.debug("blocker='%s' has blocked '%s' with reason='%s' - Adding to bot notification ...", blocker, block['blocked'], block['reason'])
+ logger.debug("blocker='%s' has blocked '%s' with reason='%s' - Adding to bot notification ...", blocker, block["blocked"], block["reason"])
blockdict.append({
- "blocked": block['blocked'],
- "reason" : block['reason'],
+ "blocked": block["blocked"],
+ "reason" : block["reason"],
})
else:
- logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, block['blocked'])
- blocks.update_last_seen(blocker, block['blocked'], block['block_level'])
- blocks.update_reason(block['reason'], blocker, block['blocked'], block['block_level'])
+ logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, block["blocked"])
+ blocks.update_last_seen(blocker, block["blocked"], block['block_level'])
+ blocks.update_reason(block["reason"], blocker, block["blocked"], block['block_level'])
- logger.debug("Invoking cookies.clear(%s) ...", block['blocked'])
- cookies.clear(block['blocked'])
+ logger.debug("Invoking cookies.clear(%s) ...", block["blocked"])
+ cookies.clear(block["blocked"])
if instances.has_pending(blocker):
logger.debug("Invoking instances.update_data(%s) ...", blocker)
for block in blocklists:
# Is domain given and not equal blocker?
if isinstance(args.domain, str) and args.domain != block["blocker"]:
- logger.debug("Skipping blocker='%s', not matching args.domain='%s'", block['blocker'], args.domain)
+ logger.debug("Skipping blocker='%s', not matching args.domain='%s'", block["blocker"], args.domain)
continue
elif args.domain in domains:
logger.debug("args.domain='%s' already handled - SKIPPED!", args.domain)
continue
# Fetch this URL
- logger.info("Fetching csv_url='%s' for blocker='%s' ...", block['csv_url'], block['blocker'])
+ logger.info("Fetching csv_url='%s' for blocker='%s' ...", block['csv_url'], block["blocker"])
response = utils.fetch_url(f"{base_url}/{block['csv_url']}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
logger.info("Checking %d text file(s) ...", len(urls))
for row in urls:
- logger.debug("Fetching row[url]='%s' ...", row['url'])
- response = utils.fetch_url(row['url'], network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ logger.debug("Fetching row[url]='%s' ...", row["url"])
+ response = utils.fetch_url(row["url"], network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and response.text != "":
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
- logger.debug("domain='%s',row[blocker]='%s'", domain, row['blocker'])
- processed = utils.process_domain(domain, row['blocker'], inspect.currentframe().f_code.co_name)
+ logger.debug("domain='%s',row[blocker]='%s'", domain, row["blocker"])
+ processed = utils.process_domain(domain, row["blocker"], inspect.currentframe().f_code.co_name)
logger.debug("processed='%s'", processed)
if not processed: