]> git.mxchange.org Git - fba.git/blobdiff - fba/utils.py
Continued:
[fba.git] / fba / utils.py
index fcc12424bd5b063c9f8ab88ad6f7cc6112084ead..30f3a1c0b43d019173ec068cc58f158f5e085bb9 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2023 Free Software Foundation
+       # Copyright (C) 2023 Free Software Foundation
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published
@@ -23,13 +23,13 @@ import requests
 import validators
 
 from fba.helpers import blacklist
-from fba.helpers import cookies
 from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
 
 from fba.http import federation
 from fba.http import network
 
+from fba.models import blocks
 from fba.models import instances
 
 logging.basicConfig(level=logging.INFO)
@@ -38,76 +38,62 @@ logger = logging.getLogger(__name__)
 ##### Other functions #####
 
 def is_primitive(var: any) -> bool:
-    logger.debug(f"var[]='{type(var)}' - CALLED!")
-    return type(var) in {int, str, float, bool} or var is None
+    logger.debug("var[]='%s' - CALLED!", type(var))
+    return type(var) in {int, str, float, bool, None} or var is None
 
 def get_hash(domain: str) -> str:
-    logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+    logger.debug("domain='%s' - CALLED!", domain)
     domain_helper.raise_on(domain)
 
     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
 
 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
-    logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
+    logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
     if not isinstance(url, str):
-        raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
+        raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
     elif url == "":
         raise ValueError("Parameter 'url' is empty")
     elif not isinstance(headers, dict):
-        raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
+        raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
     elif not isinstance(timeout, tuple):
-        raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
+        raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
 
-    logger.debug(f"Parsing url='{url}'")
+    logger.debug("Parsing url='%s' ...", url)
     components = urlparse(url)
 
     # Invoke other function, avoid trailing ?
-    logger.debug(f"components[{type(components)}]={components}")
+    logger.debug("components[%s]='%s'", type(components), components)
     if components.query != "":
         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
     else:
         response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
 
-    logger.debug(f"response[]='{type(response)}' - EXXIT!")
+    logger.debug("response[]='%s' - EXIT!", type(response))
     return response
 
 def process_domain(domain: str, blocker: str, command: str) -> bool:
-    logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
+    logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
     domain_helper.raise_on(domain)
     domain_helper.raise_on(blocker)
+
     if not isinstance(command, str):
-        raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
+        raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
     elif command == "":
         raise ValueError("Parameter 'command' is empty")
 
-    if domain.find("*") > 0:
-        # Try to de-obscure it
-        row = instances.deobscure("*", domain)
-
-        logger.debug(f"row[{type(row)}]='{row}'")
-        if row is None:
-            logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
-            return False
-
-        logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
-        domain = row[0]
-    elif domain.find("?") > 0:
-        # Try to de-obscure it
-        row = instances.deobscure("?", domain)
-
-        logger.debug(f"row[{type(row)}]='{row}'")
-        if row is None:
-            logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
-            return False
+    logger.debug("domain='%s' - BEFORE!", domain)
+    domain = deobfuscate_domain(domain, blocker)
 
-        logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
-        domain = row[0]
+    logger.debug("domain='%s' - DEOBFUSCATED!", domain)
+    if instances.has_pending(blocker):
+        logger.debug("Flushing updates for blocker='%s' ...", blocker)
+        instances.update_data(blocker)
 
     if not is_domain_wanted(domain):
         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
         return False
     elif instances.is_recent(domain):
-        logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
+        logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
         return False
 
     processed = False
@@ -115,26 +101,29 @@ def process_domain(domain: str, blocker: str, command: str) -> bool:
         logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
         federation.fetch_instances(domain, blocker, None, command)
         processed = True
-
-        logger.debug("Invoking cookies.clear(%s) ...", domain)
-        cookies.clear(domain)
     except network.exceptions as exception:
-        logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
+        logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
         instances.set_last_error(domain, exception)
 
-    logger.debug(f"processed='{processed}' - EXIT!")
+    logger.debug("Checking if domain='%s' has pending updates ...", domain)
+    if instances.has_pending(domain):
+        logger.debug("Flushing updates for domain='%s' ...", domain)
+        instances.update_data(domain)
+
+    logger.debug("processed='%s' - EXIT!", processed)
     return processed
 
 def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
     logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
     if not isinstance(tags, bs4.element.ResultSet):
-        raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
+        raise ValueError(f"Parameter tags[]='{type(tags)}' is not of type 'ResultSet'")
     elif not isinstance(search, str):
-        raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
+        raise ValueError(f"Parameter search[]='{type(search)}' is not of type 'str'")
     elif search == "":
         raise ValueError("Parameter 'search' is empty")
 
     domains = list()
+    logger.debug("Parsing %d tags ...", len(tags))
     for tag in tags:
         logger.debug("tag[]='%s'", type(tag))
         domain = tidyup.domain(tag.find(search).contents[0])
@@ -145,7 +134,7 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
             domain = tidyup.domain(tag.find("em").contents[0])
 
         if not is_domain_wanted(domain):
-            logger.debug("domain='%s' is not wanted - SKIPPED!")
+            logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
             continue
 
         logger.debug("Appending domain='%s'", domain)
@@ -154,12 +143,12 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
     logger.debug("domains()=%d - EXIT!", len(domains))
     return domains
 
-def is_domain_wanted (domain: str) -> bool:
+def is_domain_wanted(domain: str) -> bool:
     logger.debug("domain='%s' - CALLED!", domain)
 
     wanted = True
     if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+        raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
     elif domain == "":
         raise ValueError("Parameter 'domain' is empty")
     elif domain.lower() != domain:
@@ -170,12 +159,108 @@ def is_domain_wanted (domain: str) -> bool:
     elif domain.endswith(".arpa"):
         logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
         wanted = False
+    elif domain.endswith(".onion"):
+        logger.debug("domain='%s' is a TOR .onion domain - settings False ...", domain)
+        wanted = False
     elif domain.endswith(".tld"):
         logger.debug("domain='%s' is a fake domain - settings False ...", domain)
         wanted = False
     elif blacklist.is_blacklisted(domain):
         logger.debug("domain='%s' is blacklisted - settings False ...", domain)
         wanted = False
+    elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
+        logger.debug("domain='%s' is a single user", domain)
+        wanted = False
+    elif domain.find("/tag/") > 0:
+        logger.debug("domain='%s' is a tag", domain)
+        wanted = False
 
     logger.debug("wanted='%s' - EXIT!", wanted)
     return wanted
+
+def deobfuscate_domain(domain: str, blocker: str, domain_hash: str = None) -> str:
+    logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
+    domain_helper.raise_on(blocker)
+
+    if not isinstance(domain, str):
+        raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
+    elif domain == "":
+        raise ValueError("Parameter domain is empty")
+    elif not isinstance(domain_hash, str) and domain_hash is not None:
+        raise ValueError(f"Parameter domain_hash[]='{type(domain_hash)}' is not of type 'str'")
+
+    if domain.find("*") >= 0:
+        logger.debug("blocker='%s' uses obfuscated domains", blocker)
+
+        # Obscured domain name with no hash
+        row = instances.deobfuscate("*", domain, domain_hash)
+
+        logger.debug("row[]='%s'", type(row))
+        if row is not None:
+            logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
+            domain = row["domain"]
+        else:
+            logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
+            instances.set_has_obfuscation(blocker, True)
+    elif domain.find("?") >= 0:
+        logger.debug("blocker='%s' uses obfuscated domains", blocker)
+
+        # Obscured domain name with no hash
+        row = instances.deobfuscate("?", domain, domain_hash)
+
+        logger.debug("row[]='%s'", type(row))
+        if row is not None:
+            logger.debug("domain='%s' de-obscured to '%s'", domain, row["domain"])
+            domain = row["domain"]
+        else:
+            logger.warning("blocker='%s' has domain='%s' that cannot be deobfuscated.", blocker, domain)
+            instances.set_has_obfuscation(blocker, True)
+    else:
+        logger.debug("domain='%s' is not obfuscated", domain)
+
+    logger.debug("domain='%s' - EXIT!", domain)
+    return domain
+
+def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
+    logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
+    domain_helper.raise_on(blocker)
+    domain_helper.raise_on(blocked)
+
+    added = False
+    if not isinstance(reason, str) and reason is not None:
+        raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
+    elif not isinstance(block_level, str):
+        raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
+    elif block_level == "":
+        raise ValueError("Parameter block_level is empty")
+
+    if not blocks.is_instance_blocked(blocker, blocked, block_level):
+        logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
+        blocks.add_instance(blocker, blocked, reason, block_level)
+        added = True
+    else:
+        logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
+        blocks.update_last_seen(blocker, blocked, block_level)
+
+    logger.debug("added='%s' - EXIT!", added)
+    return added
+
+def alias_block_level(block_level: str) -> str:
+    logger.debug("block_level='%s' - CALLED!", block_level)
+    if not isinstance(block_level, str):
+        raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
+    elif block_level == "":
+        raise ValueError("Parameter 'block_level' is empty")
+
+    if block_level == "silence":
+        logger.debug("Block level 'silence' has been changed to 'silenced'")
+        block_level = "silenced"
+    elif block_level == "suspend":
+        logger.debug("Block level 'suspend' has been changed to 'suspended'")
+        block_level = "suspended"
+    elif block_level == "nsfw":
+        logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'")
+        block_level = "media_nsfw"
+
+    logger.debug("block_level='%s' - EXIT!", block_level)
+    return block_level