-# Copyright (C) 2023 Free Software Foundation
+ # Copyright (C) 2023 Free Software Foundation
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
from fba.http import federation
from fba.http import network
+from fba.models import blocks
from fba.models import instances
logging.basicConfig(level=logging.INFO)
def is_primitive(var: any) -> bool:
logger.debug("var[]='%s' - CALLED!", type(var))
- return type(var) in {int, str, float, bool} or var is None
+ return type(var) in {int, str, float, bool, None} or var is None
def get_hash(domain: str) -> str:
logger.debug("domain='%s' - CALLED!", domain)
def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
if not isinstance(url, str):
- raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
+ raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
elif url == "":
raise ValueError("Parameter 'url' is empty")
elif not isinstance(headers, dict):
- raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
+ raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
elif not isinstance(timeout, tuple):
- raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
+ raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
logger.debug("Parsing url='%s' ...", url)
components = urlparse(url)
domain_helper.raise_on(blocker)
if not isinstance(command, str):
- raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
+ raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
elif command == "":
raise ValueError("Parameter 'command' is empty")
- logger.debug("domain='%s' - BEFORE!")
- if domain.find("*") > 0:
- logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
- instances.set_has_obfuscation(blocker, True)
-
- # Try to de-obscure it
- row = instances.deobfuscate("*", domain)
-
- logger.debug("row[%s]='%s'", type(row), row)
- if row is None:
- logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
- return False
-
- logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
- domain = row[0]
- elif domain.find("?") > 0:
- logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
- instances.set_has_obfuscation(blocker, True)
-
- # Try to de-obscure it
- row = instances.deobfuscate("?", domain)
-
- logger.debug("row[%s]='%s'", type(row), row)
- if row is None:
- logger.warning("Cannot de-obfuscate domain='%s' - SKIPPED!", domain)
- return False
-
- logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
- domain = row[0]
- else:
- logger.debug("blocker='%s' has NO obfuscation on their block list", blocker)
- instances.set_has_obfuscation(blocker, False)
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = deobfuscate_domain(domain, blocker)
logger.debug("domain='%s' - DEOBFUSCATED!", domain)
if instances.has_pending(blocker):
federation.fetch_instances(domain, blocker, None, command)
processed = True
except network.exceptions as exception:
- logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
+ logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
instances.set_last_error(domain, exception)
- logger.debug("Checking if domain='%s' has pending updates ...")
+ logger.debug("Checking if domain='%s' has pending updates ...", domain)
if instances.has_pending(domain):
- logger.debug("Flushing updates for domain='%s' ...")
+ logger.debug("Flushing updates for domain='%s' ...", domain)
instances.update_data(domain)
logger.debug("processed='%s' - EXIT!", processed)
def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
if not isinstance(tags, bs4.element.ResultSet):
- raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
+ raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
elif not isinstance(search, str):
- raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
+ raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
elif search == "":
raise ValueError("Parameter 'search' is empty")
domain = tidyup.domain(tag.find("em").contents[0])
if not is_domain_wanted(domain):
- logger.debug("domain='%s' is not wanted - SKIPPED!")
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
logger.debug("Appending domain='%s'", domain)
wanted = True
if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
elif domain.lower() != domain:
elif domain.endswith(".arpa"):
logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
wanted = False
+ elif domain.endswith(".onion"):
+ logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain)
+ wanted = False
elif domain.endswith(".tld"):
logger.debug("domain='%s' is a fake domain - settings False ...", domain)
wanted = False
logger.debug("domain='%s' is blacklisted - settings False ...", domain)
wanted = False
elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
- loger.debug("domain='%s' is a single user", domain)
+ logger.debug("domain='%s' is a single user", domain)
wanted = False
elif domain.find("/tag/") > 0:
logger.debug("domain='%s' is a tag", domain)
logger.debug("wanted='%s' - EXIT!", wanted)
return wanted
-def deobfuscate_domain(domain: str, blocker: str) -> str:
- logger.debug("domain='%s',blocker='%s' - CALLED!", domain, blocker)
- domain_helper.raise_on(domain)
+def deobfuscate_domain(domain: str, blocker: str, domain_hash: str = None) -> str:
+ logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
domain_helper.raise_on(blocker)
- if domain.count("*") > 0:
- logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
- instances.set_has_obfuscation(blocker, True)
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
+ elif domain == "":
+ raise ValueError("Parameter domain is empty")
+ elif not isinstance(domain_hash, str) and domain_hash is not None:
+ raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
+
+ if domain.find("*") >= 0:
+ logger.debug("blocker='%s' uses obfuscated domains", blocker)
# Obscured domain name with no hash
- row = instances.deobfuscate("*", domain)
+ row = instances.deobfuscate("*", domain, domain_hash)
logger.debug("row[]='%s'", type(row))
if row is not None:
- logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
- domain = row[0]
- elif domain.count("?") > 0:
- logger.debug("blocker='%s' uses obfuscated domains, marking ...", blocker)
- instances.set_has_obfuscation(blocker, True)
+ logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
+ domain = row["domain"]
+ else:
+ logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
+ instances.set_has_obfuscation(blocker, True)
+ elif domain.find("?") >= 0:
+ logger.debug("blocker='%s' uses obfuscated domains", blocker)
# Obscured domain name with no hash
- row = instances.deobfuscate("?", domain)
+ row = instances.deobfuscate("?", domain, domain_hash)
logger.debug("row[]='%s'", type(row))
if row is not None:
- logger.debug("domain='%s' de-obscured to '%s'", domain, row[0])
- domain = row[0]
+ logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
+ domain = row["domain"]
+ else:
+ logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
+ instances.set_has_obfuscation(blocker, True)
else:
logger.debug("domain='%s' is not obfuscated", domain)
logger.debug("domain='%s' - EXIT!", domain)
return domain
+
+def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
+ logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
+ domain_helper.raise_on(blocker)
+ domain_helper.raise_on(blocked)
+
+ added = False
+ if not isinstance(reason, str) and reason is not None:
+ raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
+ elif not isinstance(block_level, str):
+ raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
+ elif block_level == "":
+ raise ValueError("Parameter block_level is empty")
+
+ if not blocks.is_instance_blocked(blocker, blocked, block_level):
+ logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
+ blocks.add_instance(blocker, blocked, reason, block_level)
+ added = True
+ else:
+ logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
+ blocks.update_last_seen(blocker, blocked, block_level)
+
+ logger.debug("added='%s' - EXIT!", added)
+ return added
+
+def alias_block_level(block_level: str) -> str:
+ logger.debug("block_level='%s' - CALLED!", block_level)
+ if not isinstance(block_level, str):
+ raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
+ elif block_level == "":
+ raise ValueError("Parameter 'block_level' is empty")
+
+ if block_level == "silence":
+ logger.debug("Block level 'silence' has been changed to 'silenced'")
+ block_level = "silenced"
+ elif block_level == "suspend":
+ logger.debug("Block level 'suspend' has been changed to 'suspended'")
+ block_level = "suspended"
+ elif block_level == "nsfw":
+ logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'")
+ block_level = "media_nsfw"
+
+ logger.debug("block_level='%s' - EXIT!", block_level)
+ return block_level