]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Wed, 21 Jun 2023 23:03:25 +0000 (01:03 +0200)
committerRoland Häder <roland@mxchange.org>
Wed, 21 Jun 2023 23:13:47 +0000 (01:13 +0200)
- introduced helper module for domains, need to be aliased to 'domain_helper'
  to avoid conflict with parameter 'domain'

17 files changed:
fba/csrf.py
fba/helpers/__init__.py
fba/helpers/blacklist.py
fba/helpers/cookies.py
fba/helpers/domain.py [new file with mode: 0644]
fba/http/federation.py
fba/http/network.py
fba/models/blocks.py
fba/models/error_log.py
fba/models/instances.py
fba/networks/friendica.py
fba/networks/lemmy.py
fba/networks/mastodon.py
fba/networks/misskey.py
fba/networks/peertube.py
fba/networks/pleroma.py
fba/utils.py

index 89abd0562e26076b55c62470a503080287a252b4..fc6f17b75280bc9c3a0c44d48f2c5a48f4809e85 100644 (file)
@@ -18,10 +18,10 @@ import logging
 
 import bs4
 import reqto
-import validators
 
 from fba.helpers import config
 from fba.helpers import cookies
+from fba.helpers import domain as domain_helper
 
 from fba.http import network
 
@@ -30,19 +30,8 @@ logger = logging.getLogger(__name__)
 
 def determine(domain: str, headers: dict) -> dict:
     logger.debug(f"domain='{domain}',headers()={len(headers)} - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(headers, dict):
+    domain_helper.raise_on(domain)
+    if not isinstance(headers, dict):
         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
 
     # Default headers with no CSRF
index b332baef0fe1f4ee65d74c2851a1aa079f7108f0..bccc33f9a67e5ee7a7115f0f23512f32a9da6e20 100644 (file)
@@ -19,6 +19,7 @@ __all__ = [
     'config',
     'cookies',
     'dicts',
+    'domain',
     'locking',
     'tidyup',
     'version',
index ec76eb0fd888b00d581dd17e138f626d371b288a..b223773c11d12a929ae782b49fe3eed987dd090d 100644 (file)
@@ -16,7 +16,7 @@
 
 import logging
 
-import validators
+from fba.helpers import domain as domain_helper
 
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
@@ -43,18 +43,7 @@ blacklist = [
 
 def is_blacklisted(domain: str) -> bool:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     blacklisted = False
     for peer in blacklist:
index b40b68bd74bb60c1ec76715dd1e3156405dd07bc..c01294193ac1593e0b7171e3c2881381b7e1367b 100644 (file)
@@ -16,7 +16,7 @@
 
 import logging
 
-import validators
+from fba.helpers import domain as domain_helper
 
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
@@ -25,20 +25,9 @@ logger = logging.getLogger(__name__)
 _cookies = {}
 
 def store (domain: str, cookies: dict):
-    logger.debug(f"domain='{domain}',cookies()={len(cookies)} - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(cookies, dict):
+    logger.debug("domain='%s',cookies()=%d - CALLED!", domain, len(cookies))
+    domain_helper.raise_on(domain)
+    if not isinstance(cookies, dict):
         raise ValueError(f"Parameter cookies[]='{type(cookies)}' is not 'dict'")
 
     _cookies[domain] = cookies
@@ -47,19 +36,8 @@ def store (domain: str, cookies: dict):
 
 def get_all(domain: str) -> dict:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not has(domain):
+    domain_helper.raise_on(domain)
+    if not has(domain):
         raise Exception(f"domain='{domain}' has no cookies stored, maybe invoke store() first?")
 
     logger.debug(f"_cookies[{domain}]()={len(_cookies[domain])} - EXIT!")
@@ -67,12 +45,7 @@ def get_all(domain: str) -> dict:
 
 def has (domain: str) -> bool:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
+    domain_helper.raise_on(domain)
 
     has_cookies = domain in _cookies
 
@@ -81,18 +54,7 @@ def has (domain: str) -> bool:
 
 def clear (domain: str):
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     if has(domain):
         logger.debug(f"Removing cookies for domain='{domain}' ...")
diff --git a/fba/helpers/domain.py b/fba/helpers/domain.py
new file mode 100644 (file)
index 0000000..ac32cf1
--- /dev/null
@@ -0,0 +1,39 @@
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+import logging
+
+import validators
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+def raise_on(domain: str):
+    logger.debug("domain='%s' - CALLED!")
+    if not isinstance(domain, str):
+        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+    elif domain == "":
+        raise ValueError("Parameter 'domain' is empty")
+    elif domain.lower() != domain:
+        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
+    elif not validators.domain(domain.split("/")[0]):
+        raise ValueError(f"domain='{domain}' is not a valid domain")
+    elif domain.endswith(".arpa"):
+        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
+    elif domain.endswith(".tld"):
+        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+
+    logger.debug("EXIT!")
index e87b72655b69413c7935700801ec066f74e1981d..ac97163f5dc9657a3236a29f8e5e4e0aba43daf8 100644 (file)
@@ -24,6 +24,7 @@ from fba import csrf
 from fba import utils
 
 from fba.helpers import config
+from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
 from fba.helpers import version
 
@@ -52,19 +53,9 @@ nodeinfo_identifier = [
 
 def fetch_instances(domain: str, origin: str, software: str, command: str, path: str = None):
     logger.debug(f"domain='{domain}',origin='{origin}',software='{software}',path='{path}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(origin, str) and origin is not None:
+    domain_helper.raise_on(domain)
+
+    if not isinstance(origin, str) and origin is not None:
         raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
     elif software is None:
         logger.debug(f"Updating last_instance_fetch for domain='{domain}' ...")
@@ -85,12 +76,6 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path:
         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
     elif command == "":
         raise ValueError("Parameter 'command' is empty")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain")
 
     if not instances.is_registered(domain):
         logger.debug(f"Adding new domain='{domain}',origin='{origin}',command='{command}',path='{path}',software='{software}'")
@@ -137,19 +122,9 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path:
 
 def fetch_peers(domain: str, software: str) -> list:
     logger.debug(f"domain({len(domain)})='{domain}',software='{software}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(software, str) and software is not None:
+    domain_helper.raise_on(domain)
+
+    if not isinstance(software, str) and software is not None:
         raise ValueError(f"software[]='{type(software)}' is not 'str'")
 
     if software == "misskey":
@@ -219,19 +194,9 @@ def fetch_peers(domain: str, software: str) -> list:
 
 def fetch_nodeinfo(domain: str, path: str = None) -> dict:
     logger.debug("domain='%s',path='%s' - CALLED!", domain, path)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(path, str) and path is not None:
+    domain_helper.raise_on(domain)
+
+    if not isinstance(path, str) and path is not None:
         raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
 
     logger.debug("Fetching nodeinfo from domain='%s' ...", domain)
@@ -297,18 +262,7 @@ def fetch_nodeinfo(domain: str, path: str = None) -> dict:
 
 def fetch_wellknown_nodeinfo(domain: str) -> dict:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     # No CSRF by default, you don't have to add network.api_headers by yourself here
     headers = tuple()
@@ -381,19 +335,9 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict:
 
 def fetch_generator_from_path(domain: str, path: str = "/") -> str:
     logger.debug("domain(%d)='%s',path='%s' - CALLED!", len(domain), domain, path)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(path, str):
+    domain_helper.raise_on(domain)
+
+    if not isinstance(path, str):
         raise ValueError(f"path[]='{type(path)}' is not 'str'")
     elif path == "":
         raise ValueError("Parameter 'path' is empty")
@@ -458,19 +402,9 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str:
 
 def determine_software(domain: str, path: str = None) -> str:
     logger.debug("domain(%d)='%s',path='%s' - CALLED!", len(domain), domain, path)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(path, str) and path is not None:
+    domain_helper.raise_on(domain)
+
+    if not isinstance(path, str) and path is not None:
         raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
 
     logger.debug("Determining software for domain,path:", domain, path)
index da936ac9af82d41fb3c74bcec065f8f9a531d98e..88a715ac4470c47680a54732157cfa8b818d91d0 100644 (file)
@@ -20,12 +20,12 @@ import json
 import reqto
 import requests
 import urllib3
-import validators
 
 from fba import utils
 
 from fba.helpers import config
 from fba.helpers import cookies
+from fba.helpers import domain as domain_helper
 
 from fba.models import instances
 
@@ -57,19 +57,9 @@ exceptions = (
 
 def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) -> dict:
     logger.debug(f"domain='{domain}',path='{path}',data='{data}',headers()={len(headers)} - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(path, str):
+    domain_helper.raise_on(domain)
+
+    if not isinstance(path, str):
         raise ValueError(f"path[]='{type(path)}' is not 'str'")
     elif path == "":
         raise ValueError("Parameter 'path' cannot be empty")
@@ -149,19 +139,9 @@ def fetch_api_url(url: str, timeout: tuple) -> dict:
 
 def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict:
     logger.debug(f"domain='{domain}',path='{path}',timeout()={len(timeout)} - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(path, str):
+    domain_helper.raise_on(domain)
+
+    if not isinstance(path, str):
         raise ValueError(f"path[]='{type(path)}' is not 'str'")
     elif path == "":
         raise ValueError("Parameter 'path' cannot be empty")
@@ -206,19 +186,9 @@ def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict:
 
 def send_bot_post(domain: str, blocklist: dict):
     logger.debug(f"domain='{domain}',blocklist()={len(blocklist)} - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(blocklist, dict):
+    domain_helper.raise_on(domain)
+
+    if not isinstance(blocklist, dict):
         raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'")
 
     message = f"{domain} has blocked the following instances:\n\n"
@@ -259,19 +229,9 @@ def send_bot_post(domain: str, blocklist: dict):
 
 def fetch_response(domain: str, path: str, headers: dict, timeout: tuple) -> requests.models.Response:
     logger.debug(f"domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(path, str):
+    domain_helper.raise_on(domain)
+
+    if not isinstance(path, str):
         raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
     elif path == "":
         raise ValueError("Parameter 'path' is empty")
index d659d4f479014a2f20ae55b18be39976f68a7553..6e6de8348e69179dce3c44f44f88b99a5cb08c40 100644 (file)
 import logging
 
 import time
-import validators
 
 from fba import database
 
 from fba.helpers import blacklist
+from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
 
 logging.basicConfig(level=logging.INFO)
@@ -132,26 +132,13 @@ def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool:
 
 def add_instance(blocker: str, blocked: str, reason: str, block_level: str):
     logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
-    if not isinstance(blocker, str):
-        raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
-    elif blocker == "":
-        raise ValueError("Parameter 'blocker' is empty")
-    elif blocker.lower() != blocker:
-        raise ValueError(f"Parameter blocker='{blocker}' must be all lower-case")
-    elif not validators.domain(blocker.split("/")[0]):
-        raise ValueError(f"Bad blocker='{blocker}'")
-    elif not isinstance(blocked, str):
-        raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
-    elif blocked == "":
-        raise ValueError("Parameter 'blocked' is empty")
-    elif blocked.lower() != blocked:
-        raise ValueError(f"Parameter blocked='{blocked}' must be all lower-case")
-    elif not isinstance(block_level, str):
+    domain_helper.raise_on(blocker)
+    domain_helper.raise_on(blocked)
+
+    if not isinstance(block_level, str):
         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
     elif block_level == "":
         raise ValueError("Parameter 'block_level' is empty")
-    elif not validators.domain(blocked.split("/")[0]):
-        raise ValueError(f"Bad blocked='{blocked}'")
     elif blacklist.is_blacklisted(blocker):
         raise Exception(f"blocker='{blocker}' is blacklisted but function invoked")
     elif blacklist.is_blacklisted(blocked):
index 8e3b693ca07f58e750b98c0eca5c76e3337e8ee3..bfa27f2cbe9ec2c1e9e2ec9990558208d27e06af 100644 (file)
@@ -17,30 +17,18 @@ import logging
 import json
 import time
 
-import validators
-
 from fba import database
 
 from fba.helpers import config
+from fba.helpers import domain as domain_helper
 
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
 def add(domain: str, error: dict):
     logger.debug("domain='%s',error[]='%s' - CALLED!", domain, type(error))
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif config.get("write_error_log").lower() != "true":
+    domain_helper.raise_on(domain)
+    if config.get("write_error_log").lower() != "true":
         logger.debug("Writing to error_log is disabled in configuruation file - EXIT!")
         return
 
index 09f1f821a6ebc8560ca1bf2a77efc6a089785517..7a4924d7d0b9b48b8e04b7dba9707f40079e696b 100644 (file)
@@ -27,6 +27,7 @@ from fba import utils
 from fba.helpers import blacklist
 from fba.helpers import cache
 from fba.helpers import config
+from fba.helpers import domain as domain_helper
 
 from fba.http import federation
 from fba.http import network
@@ -62,23 +63,12 @@ _pending = {
 }
 
 def _set_data(key: str, domain: str, value: any):
-    logger.debug(f"key='{key}',domain='{domain}',value[]='{type(value)}' - CALLED!")
+    logger.debug("key='%s',domain='%s',value[]='%s' - CALLED!", key, domain, type(value))
+    domain_helper.raise_on(domain)
     if not isinstance(key, str):
         raise ValueError("Parameter key[]='{type(key)}' is not 'str'")
     elif key == "":
         raise ValueError("Parameter 'key' is empty")
-    elif not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
     elif not key in _pending:
         raise ValueError(f"key='{key}' not found in _pending")
     elif not utils.is_primitive(value):
@@ -91,59 +81,37 @@ def _set_data(key: str, domain: str, value: any):
 
 def has_pending(domain: str) -> bool:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     has = False
     for key in _pending:
-        logger.debug(f"key='{key}',domain='{domain}',_pending[key]()='{len(_pending[key])}'")
+        logger.debug("key='%s',domain='%s',_pending[key]()=%d", key, domain, len(_pending[key]))
         if domain in _pending[key]:
             has = True
             break
 
-    logger.debug(f"has='{has}' - EXIT!")
+    logger.debug("has='%s' - EXIT!", has)
     return has
 
 def update_data(domain: str):
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not has_pending(domain):
+    domain_helper.raise_on(domain)
+    if not has_pending(domain):
         raise Exception(f"domain='{domain}' has no pending instance data, but function invoked")
     elif not is_registered(domain):
         raise Exception(f"domain='{domain}' cannot be updated while not being registered")
 
-    logger.debug(f"Updating instance data for domain='{domain}' ...")
+    logger.debug("Updating instance data for domain='%s' ...", domain)
     sql_string = ""
     fields = list()
     for key in _pending:
-        logger.debug("key:", key)
+        logger.debug("Checking key='%s',domain='%s'", key, domain)
         if domain in _pending[key]:
-            logger.debug(f"Adding '{_pending[key][domain]}' for key='{key}' ...")
+            logger.debug("Adding '%s' for key='%s' ...", _pending[key][domain], key)
             fields.append(_pending[key][domain])
             sql_string += f" {key} = ?,"
 
-    logger.debug(f"sql_string()={len(sql_string)}")
+    logger.debug("sql_string()=%d", len(sql_string))
     if sql_string == "":
         raise ValueError(f"No fields have been set, but method invoked, domain='{domain}'")
 
@@ -153,43 +121,32 @@ def update_data(domain: str):
     # For WHERE statement
     fields.append(domain)
 
-    logger.debug(f"sql_string='{sql_string}',fields()={len(fields)}")
+    logger.debug("sql_string='%s',fields()=%d", sql_string, len(fields))
     sql_string = "UPDATE instances SET" + sql_string + " last_updated = ? WHERE domain = ? LIMIT 1"
-    logger.debug("sql_string:", sql_string)
 
-    logger.debug("Executing SQL:", sql_string)
+    logger.debug("Executing SQL: '%s'", sql_string)
     database.cursor.execute(sql_string, fields)
 
-    logger.debug(f"Success! (rowcount={database.cursor.rowcount })")
+    logger.debug("rowcount=%d", database.cursor.rowcount)
     if database.cursor.rowcount == 0:
         raise Exception(f"Did not update any rows: domain='{domain}',fields()={len(fields)}")
 
     logger.debug("Invoking commit() ...")
     database.connection.commit()
 
-    logger.debug(f"Deleting _pending for domain='{domain}'")
+    logger.debug("Deleting _pending for domain='%s'", domain)
     for key in _pending:
-        logger.debug(f"domain='{domain}',key='{key}'")
+        logger.debug("domain='%s',key='%s'", domain, key)
         if domain in _pending[key]:
+            logger.debug("Deleting key='%s',domain='%s' ...", key, domain)
             del _pending[key][domain]
 
     logger.debug("EXIT!")
 
 def add(domain: str, origin: str, command: str, path: str = None, software: str = None):
-    logger.debug(f"domain='{domain}',origin='{origin}',command='{command}',path='{path}',software='{software}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(origin, str) and origin is not None:
+    logger.debug("domain='%s',origin='%s',command='%s',path='%s',software='%s' - CALLED!", domain, origin, command, path, software)
+    domain_helper.raise_on(domain)
+    if not isinstance(origin, str) and origin is not None:
         raise ValueError(f"origin[]='{type(origin)}' is not 'str'")
     elif origin == "":
         raise ValueError("Parameter 'origin' is empty")
@@ -197,8 +154,6 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str
         raise ValueError(f"command[]='{type(command)}' is not 'str'")
     elif command == "":
         raise ValueError("Parameter 'command' is empty")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"Bad domain name='{domain}'")
     elif not isinstance(path, str) and path is not None:
         raise ValueError(f"path[]='{type(path)}' is not 'str'")
     elif path == "":
@@ -207,20 +162,16 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str
         raise ValueError(f"software[]='{type(software)}' is not 'str'")
     elif software == "":
         raise ValueError("Parameter 'software' is empty")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"Please don't crawl .arpa domains: domain='{domain}'")
     elif origin is not None and not validators.domain(origin.split("/")[0]):
         raise ValueError(f"Bad origin name='{origin}'")
     elif blacklist.is_blacklisted(domain):
         raise Exception(f"domain='{domain}' is blacklisted, but method invoked")
     elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (software == "lemmy" and domain.find("/c/") > 0):
         raise Exception(f"domain='{domain}' is a single user")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
 
     if software is None:
         try:
-            logger.debug("domain,origin,command,path:", domain, origin, command, path)
+            logger.debug("domain='%s',origin='%s',command='%s',path='%s'", domain, origin, command, path)
             software = federation.determine_software(domain, path)
         except network.exceptions as exception:
             logger.warning("Exception '%s' during determining software type, domain='%s'", type(exception), domain)
@@ -233,7 +184,7 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str
             logger.warning("domain='%s' already registered after cutting off user part. - EXIT!", domain)
             return
 
-    logger.info("Adding instance domain='%s' (origin='%s',software='%s')", domain, origin, software)
+    logger.info("Adding instance domain='%s',origin='%s',software='%s',command='%s'", domain, origin, software, command)
     database.cursor.execute(
         "INSERT INTO instances (domain, origin, command, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
         (
@@ -246,96 +197,64 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str
         ),
     )
 
-    logger.debug(f"Marking domain='{domain}' as registered.")
+    logger.debug("Marking domain='%s' as registered.", domain)
     cache.set_sub_key("is_registered", domain, True)
 
     if has_pending(domain):
-        logger.debug(f"domain='{domain}' has pending nodeinfo being updated ...")
+        logger.debug("domain='%s' has pending nodeinfo being updated ...", domain)
         update_data(domain)
 
     logger.debug("EXIT!")
 
 def set_last_nodeinfo(domain: str):
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     logger.debug("Updating last_nodeinfo for domain:", domain)
     _set_data("last_nodeinfo", domain, time.time())
 
     # Running pending updated
-    logger.debug(f"Invoking update_data({domain}) ...")
+    logger.debug("Invoking update_data(%s) ...", domain)
     update_data(domain)
 
     logger.debug("EXIT!")
 
 def set_last_error(domain: str, error: dict):
-    logger.debug("domain,error[]:", domain, type(error))
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-
-    logger.debug("BEFORE error[]:", type(error))
+    logger.debug("domain='%s',error[]='%s' - CALLED!", domain, type(error))
+    domain_helper.raise_on(domain)
+
+    logger.debug("error[]='%s' - BEFORE!", type(error))
     if isinstance(error, (BaseException, json.decoder.JSONDecodeError)):
         error = f"error[{type(error)}]='{str(error)}'"
-    logger.debug("AFTER error[]:", type(error))
+    logger.debug("error[]='%s' - AFTER!", type(error))
 
     if isinstance(error, str):
-        logger.debug(f"Setting last_error_details='{error}'")
+        logger.debug("Setting last_error_details='%s' (str)", error)
         _set_data("last_status_code"  , domain, 999)
         _set_data("last_error_details", domain, error if error != "" else None)
     elif isinstance(error, requests.models.Response):
-        logger.debug(f"Setting last_error_details='{error.reason}'")
+        logger.debug("Setting last_error_details='%s' (Response)", error.reason)
         _set_data("last_status_code"  , domain, error.status_code)
         _set_data("last_error_details", domain, error.reason if error.reason != "" else None)
     elif not isinstance(error, dict):
         raise KeyError(f"Cannot handle keys in error[{type(error)}]='{error}'")
     elif "status_code" in error and "error_message" in error:
-        logger.debug(f"Setting last_error_details='{error['error_message']}'")
+        logger.debug("Setting last_error_details='%s' (error_message)", error['error_message'])
         _set_data("last_status_code"  , domain, error["status_code"])
         _set_data("last_error_details", domain, error["error_message"] if error["error_message"] != "" else None)
     elif "json" in error and "error" in error["json"]:
+        logger.debug("Setting last_error_details='%s' (json,error)", error["json"]["error"])
         _set_data("last_status_code"  , domain, error["status_code"])
         _set_data("last_error_details", domain, error["json"]["error"] if error["json"]["error"] != "" else None)
 
-    logger.debug(f"Invoking error_log.add(domain='{domain}',error[]='{type(error)}'")
+    logger.debug("Invoking error_log.add(domain='%s',error[]='%s'", domain, type(error))
     error_log.add(domain, error)
 
     logger.debug("EXIT!")
 
 def is_registered(domain: str) -> bool:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
     if not cache.key_exists("is_registered"):
@@ -348,24 +267,13 @@ def is_registered(domain: str) -> bool:
     # Is cache found?
     registered = cache.sub_key_exists("is_registered", domain)
 
-    logger.debug(f"registered='{registered}' - EXIT!")
+    logger.debug("registered='%s' - EXIT!", registered)
     return registered
 
 def is_recent(domain: str) -> bool:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not is_registered(domain):
+    domain_helper.raise_on(domain)
+    if not is_registered(domain):
         logger.debug(f"domain='{domain}' is not registered, returning False - EXIT!")
         return False
 
@@ -375,47 +283,37 @@ def is_recent(domain: str) -> bool:
     # Fetch row
     fetched = database.cursor.fetchone()[0]
 
-    logger.debug(f"fetched[{type(fetched)}]='{fetched}'")
+    logger.debug("fetched[%s]='%s'", type(fetched), fetched)
     recently = isinstance(fetched, float) and time.time() - fetched <= config.get("recheck_instance")
 
-    logger.debug(f"recently='{recently}' - EXIT!")
+    logger.debug("recently='%s' - EXIT!", recently)
     return recently
 
 def deobscure(char: str, domain: str, blocked_hash: str = None) -> tuple:
-    logger.debug(f"char='{char}',domain='{domain}',blocked_hash='{blocked_hash}' - CALLED!")
+    logger.debug("char='%s',domain='%s',blocked_hash='%s' - CALLED!", char, domain, blocked_hash)
+    domain_helper.raise_on(domain)
     if not isinstance(char, str):
         raise ValueError(f"Parameter char[]='{type(char)}' is not 'str'")
     elif char == "":
         raise ValueError("Parameter 'char' is empty")
-    elif not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
     elif not isinstance(blocked_hash, str) and blocked_hash is not None:
         raise ValueError(f"Parameter blocked_hash[]='{type(blocked_hash)}' is not 'str'")
 
+    logger.debug("blocked_hash[]='%s'", type(blocked_hash))
     if isinstance(blocked_hash, str):
-        logger.debug(f"Looking up blocked_hash='{blocked_hash}' ...")
+        logger.debug("Looking up blocked_hash='%s',domain='%s' ...", blocked_hash, domain)
         database.cursor.execute(
-            "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? LIMIT 1", [blocked_hash]
+            "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? OR domain LIKE ? LIMIT 1", [blocked_hash, domain.replace("_")]
         )
 
         row = database.cursor.fetchone()
         logger.debug("row[]='%s'", type(row))
 
         if row is None:
-            logger.debug(f"blocked_hash='{blocked_hash}' not found, trying domain='{domain}' ...")
+            logger.debug("blocked_hash='%s' not found, trying domain='%s' ...", blocked_hash, domain)
             return deobscure(char, domain)
     else:
-        logger.debug(f"Looking up domain='{domain}' ...")
+        logger.debug("Looking up domain='%s' ...", domain)
         database.cursor.execute(
             "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [domain.replace(char, "_")]
         )
@@ -423,23 +321,12 @@ def deobscure(char: str, domain: str, blocked_hash: str = None) -> tuple:
         row = database.cursor.fetchone()
         logger.debug("row[]='%s'", type(row))
 
-    logger.debug(f"row[]='{type(row)}' - EXIT!")
+    logger.debug("row[]='%s' - EXIT!", type(row))
     return row
 
 def set_last_blocked(domain: str):
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     # Set timestamp
     _set_data("last_blocked", domain, time.time())
@@ -447,18 +334,7 @@ def set_last_blocked(domain: str):
 
 def set_last_instance_fetch(domain: str):
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     # Set timestamp
     _set_data("last_instance_fetch", domain, time.time())
@@ -466,19 +342,8 @@ def set_last_instance_fetch(domain: str):
 
 def set_total_peers(domain: str, peers: list):
     logger.debug(f"domain='{domain}',peers()={len(peers)} - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(peers, list):
+    domain_helper.raise_on(domain)
+    if not isinstance(peers, list):
         raise ValueError(f"Parameter peers[]='{type(peers)}' is not 'list'")
 
     # Set timestamp
@@ -487,19 +352,8 @@ def set_total_peers(domain: str, peers: list):
 
 def set_nodeinfo_url(domain: str, url: str):
     logger.debug(f"domain='{domain}',url='{url}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(url, str):
+    domain_helper.raise_on(domain)
+    if not isinstance(url, str):
         raise ValueError("Parameter url[]='{type(url)}' is not 'list'")
     elif url == "":
         raise ValueError("Parameter 'url' is empty")
@@ -510,19 +364,8 @@ def set_nodeinfo_url(domain: str, url: str):
 
 def set_detection_mode(domain: str, mode: str):
     logger.debug(f"domain='{domain}',mode='{mode}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(mode, str):
+    domain_helper.raise_on(domain)
+    if not isinstance(mode, str):
         raise ValueError("Parameter mode[]='{type(mode)}' is not 'list'")
     elif mode == "":
         raise ValueError("Parameter 'mode' is empty")
index 39b81d37483c56fe779f60f71a6ad4e9cf17faa2..9441d68bf10ed717873ec32dcba2dabe5ce2bfe8 100644 (file)
 import logging
 
 import bs4
-import validators
 
 from fba import utils
 
 from fba.helpers import config
+from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
 
 from fba.http import network
@@ -33,18 +33,7 @@ logger = logging.getLogger(__name__)
 
 def fetch_blocks(domain: str) -> dict:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     blocklist = list()
     block_tag = None
index 1b8c6d23b5616f5ad9e9122fb007ef2c84d7d24c..7a415a7b403c54bc468f87d43ea320e5d30e306f 100644 (file)
@@ -18,13 +18,13 @@ import inspect
 import logging
 
 import bs4
-import validators
 
 from fba import csrf
 from fba import database
 from fba import utils
 
 from fba.helpers import config
+from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
 
 from fba.http import federation
@@ -38,18 +38,7 @@ logger = logging.getLogger(__name__)
 
 def fetch_peers(domain: str) -> list:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     peers = list()
 
@@ -97,19 +86,8 @@ def fetch_peers(domain: str) -> list:
 
 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
     logger.debug(f"domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(origin, str) and origin is not None:
+    domain_helper.raise_on(domain)
+    if not isinstance(origin, str) and origin is not None:
         raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
     elif origin == "":
         raise ValueError("Parameter 'origin' is empty")
index ce662da8fd5ee0bc9ef1b9634a162c68fd98a16f..498b0f22cee085158bbeaf2c7464bbcc67596c10 100644 (file)
@@ -18,7 +18,6 @@ import inspect
 import logging
 
 import bs4
-import validators
 
 from fba import csrf
 from fba import database
@@ -26,6 +25,7 @@ from fba import utils
 
 from fba.helpers import blacklist
 from fba.helpers import config
+from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
 
 from fba.http import network
@@ -65,18 +65,7 @@ language_mapping = {
 
 def fetch_blocks_from_about(domain: str) -> dict:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     logger.debug("Fetching mastodon blocks from domain:", domain)
     doc = None
@@ -144,19 +133,8 @@ def fetch_blocks_from_about(domain: str) -> dict:
 
 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
     logger.debug(f"domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(origin, str) and origin is not None:
+    domain_helper.raise_on(domain)
+    if not isinstance(origin, str) and origin is not None:
         raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
     elif origin == "":
         raise ValueError("Parameter 'origin' is empty")
index 1d6fbb438a18ff4303731d6ed4313d37184358c3..cd1097ab00e1da5438f3fefadf78a78f92c322c3 100644 (file)
 
 import json
 import logging
-import validators
 
 from fba import csrf
 from fba import utils
 
 from fba.helpers import config
 from fba.helpers import dicts
+from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
 
 from fba.http import network
@@ -34,18 +34,7 @@ logger = logging.getLogger(__name__)
 
 def fetch_peers(domain: str) -> list:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     logger.debug("domain='%s' is misskey, sending API POST request ...", domain)
     peers  = list()
@@ -139,18 +128,7 @@ def fetch_peers(domain: str) -> list:
 
 def fetch_blocks(domain: str) -> dict:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     logger.debug("Fetching misskey blocks from domain='%s'", domain)
     blocklist = {
index f43a798e0629e2db9e2b90dbed18443876927763..b44f748a1863ab2201d03d0c1ff272d78c449560 100644 (file)
 
 import logging
 
-import validators
-
 from fba import csrf
 
 from fba.helpers import config
+from fba.helpers import domain as domain_helper
 
 from fba.http import network
 
@@ -31,18 +30,7 @@ logger = logging.getLogger(__name__)
 
 def fetch_peers(domain: str) -> list:
     logger.debug(f"domain({len(domain)})='{domain}',software='peertube' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     logger.debug(f"domain='{domain}' is a PeerTube, fetching JSON ...")
     peers   = list()
index 7899906708fba85266280d5280aaa5a0f789dca5..9c745a821ebf881af1b469b711763fba04c20696 100644 (file)
@@ -18,13 +18,13 @@ import inspect
 import logging
 
 import bs4
-import validators
 
 from fba import database
 from fba import utils
 
 from fba.helpers import blacklist
 from fba.helpers import config
+from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
 
 from fba.http import federation
@@ -44,19 +44,8 @@ language_mapping = {
 
 def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
     logger.debug(f"domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(origin, str) and origin is not None:
+    domain_helper.raise_on(domain)
+    if not isinstance(origin, str) and origin is not None:
         raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
     elif origin == "":
         raise ValueError("Parameter 'origin' is empty")
@@ -488,18 +477,7 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
 
 def fetch_blocks_from_about(domain: str) -> dict:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     logger.debug(f"Fetching mastodon blocks from domain='{domain}'")
     doc = None
index d98a8e106f5774767a2c48c4236f8588ed6d5778..fcc12424bd5b063c9f8ab88ad6f7cc6112084ead 100644 (file)
@@ -24,6 +24,7 @@ import validators
 
 from fba.helpers import blacklist
 from fba.helpers import cookies
+from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
 
 from fba.http import federation
@@ -42,18 +43,7 @@ def is_primitive(var: any) -> bool:
 
 def get_hash(domain: str) -> str:
     logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+    domain_helper.raise_on(domain)
 
     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
 
@@ -83,29 +73,9 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon
 
 def process_domain(domain: str, blocker: str, command: str) -> bool:
     logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
-    elif not validators.domain(domain.split("/")[0]):
-        raise ValueError(f"domain='{domain}' is not a valid domain")
-    elif domain.endswith(".arpa"):
-        raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif domain.endswith(".tld"):
-        raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-    elif not isinstance(blocker, str):
-        raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
-    elif blocker == "":
-        raise ValueError("Parameter 'blocker' is empty")
-    elif not validators.domain(blocker.split("/")[0]):
-        raise ValueError(f"blocker='{blocker}' is not a valid domain")
-    elif blocker.endswith(".arpa"):
-        raise ValueError(f"blocker='{blocker}' is a domain for reversed IP addresses, please don't crawl them!")
-    elif blocker.endswith(".tld"):
-        raise ValueError(f"blocker='{blocker}' is a fake domain, please don't crawl them!")
-    elif not isinstance(command, str):
+    domain_helper.raise_on(domain)
+    domain_helper.raise_on(blocker)
+    if not isinstance(command, str):
         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
     elif command == "":
         raise ValueError("Parameter 'command' is empty")
@@ -186,14 +156,14 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
 
 def is_domain_wanted (domain: str) -> bool:
     logger.debug("domain='%s' - CALLED!", domain)
-    wanted = True
 
+    wanted = True
     if not isinstance(domain, str):
         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
     elif domain == "":
         raise ValueError("Parameter 'domain' is empty")
     elif domain.lower() != domain:
-        raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
+        wanted = False
     elif not validators.domain(domain.split("/")[0]):
         logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
         wanted = False