From: Roland Häder Date: Wed, 21 Jun 2023 23:03:25 +0000 (+0200) Subject: Continued: X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=40bdbd80a5852e66149e4e3879ac20ed0e0ddacb;p=fba.git Continued: - introduced helper module for domains, need to be aliased to 'domain_helper' to avoid conflict with parameter 'domain' --- diff --git a/fba/csrf.py b/fba/csrf.py index 89abd05..fc6f17b 100644 --- a/fba/csrf.py +++ b/fba/csrf.py @@ -18,10 +18,10 @@ import logging import bs4 import reqto -import validators from fba.helpers import config from fba.helpers import cookies +from fba.helpers import domain as domain_helper from fba.http import network @@ -30,19 +30,8 @@ logger = logging.getLogger(__name__) def determine(domain: str, headers: dict) -> dict: logger.debug(f"domain='{domain}',headers()={len(headers)} - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(headers, dict): + domain_helper.raise_on(domain) + if not isinstance(headers, dict): raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'") # Default headers with no CSRF diff --git a/fba/helpers/__init__.py b/fba/helpers/__init__.py index b332bae..bccc33f 100644 --- a/fba/helpers/__init__.py +++ b/fba/helpers/__init__.py @@ -19,6 +19,7 @@ __all__ = [ 'config', 'cookies', 'dicts', + 'domain', 'locking', 'tidyup', 'version', diff --git a/fba/helpers/blacklist.py b/fba/helpers/blacklist.py index ec76eb0..b223773 100644 --- a/fba/helpers/blacklist.py +++ b/fba/helpers/blacklist.py @@ -16,7 +16,7 @@ import logging -import validators +from fba.helpers import domain as domain_helper logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -43,18 +43,7 @@ blacklist = [ def is_blacklisted(domain: str) -> bool: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) blacklisted = False for peer in blacklist: diff --git a/fba/helpers/cookies.py b/fba/helpers/cookies.py index b40b68b..c012941 100644 --- a/fba/helpers/cookies.py +++ b/fba/helpers/cookies.py @@ -16,7 +16,7 @@ import logging -import validators +from fba.helpers import domain as domain_helper logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -25,20 +25,9 @@ logger = logging.getLogger(__name__) _cookies = {} def store (domain: str, cookies: dict): - logger.debug(f"domain='{domain}',cookies()={len(cookies)} - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(cookies, dict): + logger.debug("domain='%s',cookies()=%d - CALLED!", domain, len(cookies)) + domain_helper.raise_on(domain) + if not isinstance(cookies, dict): raise ValueError(f"Parameter cookies[]='{type(cookies)}' is not 'dict'") _cookies[domain] = cookies @@ -47,19 +36,8 @@ def store (domain: str, cookies: dict): def get_all(domain: str) -> dict: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not has(domain): + domain_helper.raise_on(domain) + if not has(domain): raise Exception(f"domain='{domain}' has no cookies stored, maybe invoke store() first?") logger.debug(f"_cookies[{domain}]()={len(_cookies[domain])} - EXIT!") @@ -67,12 +45,7 @@ def get_all(domain: str) -> dict: def has (domain: str) -> bool: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") + domain_helper.raise_on(domain) has_cookies = domain in _cookies @@ -81,18 +54,7 @@ def has (domain: str) -> bool: def clear (domain: str): logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) if has(domain): logger.debug(f"Removing cookies for domain='{domain}' ...") diff --git a/fba/helpers/domain.py b/fba/helpers/domain.py new file mode 100644 index 0000000..ac32cf1 --- /dev/null +++ b/fba/helpers/domain.py @@ -0,0 +1,39 @@ +# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes +# Copyright (C) 2023 Free Software Foundation +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import logging + +import validators + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def raise_on(domain: str): + logger.debug("domain='%s' - CALLED!") + if not isinstance(domain, str): + raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") + elif domain == "": + raise ValueError("Parameter 'domain' is empty") + elif domain.lower() != domain: + raise ValueError(f"Parameter domain='{domain}' must be all lower-case") + elif not validators.domain(domain.split("/")[0]): + raise ValueError(f"domain='{domain}' is not a valid domain") + elif domain.endswith(".arpa"): + raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") + elif domain.endswith(".tld"): + raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + + logger.debug("EXIT!") diff --git a/fba/http/federation.py b/fba/http/federation.py index e87b726..ac97163 100644 --- a/fba/http/federation.py +++ b/fba/http/federation.py @@ -24,6 +24,7 @@ from fba import csrf from fba import utils from fba.helpers import config +from fba.helpers import domain as domain_helper from fba.helpers import tidyup from fba.helpers import version @@ -52,19 +53,9 @@ nodeinfo_identifier = [ def fetch_instances(domain: str, origin: str, software: str, command: str, path: str = None): logger.debug(f"domain='{domain}',origin='{origin}',software='{software}',path='{path}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(origin, str) and origin is not None: + domain_helper.raise_on(domain) + + if not isinstance(origin, str) and origin is not None: raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'") elif software is None: logger.debug(f"Updating last_instance_fetch for domain='{domain}' ...") @@ -85,12 +76,6 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path: raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'") elif command == "": raise ValueError("Parameter 'command' is empty") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain") if not instances.is_registered(domain): logger.debug(f"Adding new domain='{domain}',origin='{origin}',command='{command}',path='{path}',software='{software}'") @@ -137,19 +122,9 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path: def fetch_peers(domain: str, software: str) -> list: logger.debug(f"domain({len(domain)})='{domain}',software='{software}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(software, str) and software is not None: + domain_helper.raise_on(domain) + + if not isinstance(software, str) and software is not None: raise ValueError(f"software[]='{type(software)}' is not 'str'") if software == "misskey": @@ -219,19 +194,9 @@ def fetch_peers(domain: str, software: str) -> list: def fetch_nodeinfo(domain: str, path: str = None) -> dict: logger.debug("domain='%s',path='%s' - CALLED!", domain, path) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(path, str) and path is not None: + domain_helper.raise_on(domain) + + if not isinstance(path, str) and path is not None: raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") logger.debug("Fetching nodeinfo from domain='%s' ...", domain) @@ -297,18 +262,7 @@ def fetch_nodeinfo(domain: str, path: str = None) -> dict: def fetch_wellknown_nodeinfo(domain: str) -> dict: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) # No CSRF by default, you don't have to add network.api_headers by yourself here headers = tuple() @@ -381,19 +335,9 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict: def fetch_generator_from_path(domain: str, path: str = "/") -> str: logger.debug("domain(%d)='%s',path='%s' - CALLED!", len(domain), domain, path) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(path, str): + domain_helper.raise_on(domain) + + if not isinstance(path, str): raise ValueError(f"path[]='{type(path)}' is not 'str'") elif path == "": raise ValueError("Parameter 'path' is empty") @@ -458,19 +402,9 @@ def fetch_generator_from_path(domain: str, path: str = "/") -> str: def determine_software(domain: str, path: str = None) -> str: logger.debug("domain(%d)='%s',path='%s' - CALLED!", len(domain), domain, path) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(path, str) and path is not None: + domain_helper.raise_on(domain) + + if not isinstance(path, str) and path is not None: raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") logger.debug("Determining software for domain,path:", domain, path) diff --git a/fba/http/network.py b/fba/http/network.py index da936ac..88a715a 100644 --- a/fba/http/network.py +++ b/fba/http/network.py @@ -20,12 +20,12 @@ import json import reqto import requests import urllib3 -import validators from fba import utils from fba.helpers import config from fba.helpers import cookies +from fba.helpers import domain as domain_helper from fba.models import instances @@ -57,19 +57,9 @@ exceptions = ( def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) -> dict: logger.debug(f"domain='{domain}',path='{path}',data='{data}',headers()={len(headers)} - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(path, str): + domain_helper.raise_on(domain) + + if not isinstance(path, str): raise ValueError(f"path[]='{type(path)}' is not 'str'") elif path == "": raise ValueError("Parameter 'path' cannot be empty") @@ -149,19 +139,9 @@ def fetch_api_url(url: str, timeout: tuple) -> dict: def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict: logger.debug(f"domain='{domain}',path='{path}',timeout()={len(timeout)} - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(path, str): + domain_helper.raise_on(domain) + + if not isinstance(path, str): raise ValueError(f"path[]='{type(path)}' is not 'str'") elif path == "": raise ValueError("Parameter 'path' cannot be empty") @@ -206,19 +186,9 @@ def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict: def send_bot_post(domain: str, blocklist: dict): logger.debug(f"domain='{domain}',blocklist()={len(blocklist)} - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(blocklist, dict): + domain_helper.raise_on(domain) + + if not isinstance(blocklist, dict): raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'") message = f"{domain} has blocked the following instances:\n\n" @@ -259,19 +229,9 @@ def send_bot_post(domain: str, blocklist: dict): def fetch_response(domain: str, path: str, headers: dict, timeout: tuple) -> requests.models.Response: logger.debug(f"domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(path, str): + domain_helper.raise_on(domain) + + if not isinstance(path, str): raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'") elif path == "": raise ValueError("Parameter 'path' is empty") diff --git a/fba/models/blocks.py b/fba/models/blocks.py index d659d4f..6e6de83 100644 --- a/fba/models/blocks.py +++ b/fba/models/blocks.py @@ -17,11 +17,11 @@ import logging import time -import validators from fba import database from fba.helpers import blacklist +from fba.helpers import domain as domain_helper from fba.helpers import tidyup logging.basicConfig(level=logging.INFO) @@ -132,26 +132,13 @@ def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool: def add_instance(blocker: str, blocked: str, reason: str, block_level: str): logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level) - if not isinstance(blocker, str): - raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'") - elif blocker == "": - raise ValueError("Parameter 'blocker' is empty") - elif blocker.lower() != blocker: - raise ValueError(f"Parameter blocker='{blocker}' must be all lower-case") - elif not validators.domain(blocker.split("/")[0]): - raise ValueError(f"Bad blocker='{blocker}'") - elif not isinstance(blocked, str): - raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'") - elif blocked == "": - raise ValueError("Parameter 'blocked' is empty") - elif blocked.lower() != blocked: - raise ValueError(f"Parameter blocked='{blocked}' must be all lower-case") - elif not isinstance(block_level, str): + domain_helper.raise_on(blocker) + domain_helper.raise_on(blocked) + + if not isinstance(block_level, str): raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'") elif block_level == "": raise ValueError("Parameter 'block_level' is empty") - elif not validators.domain(blocked.split("/")[0]): - raise ValueError(f"Bad blocked='{blocked}'") elif blacklist.is_blacklisted(blocker): raise Exception(f"blocker='{blocker}' is blacklisted but function invoked") elif blacklist.is_blacklisted(blocked): diff --git a/fba/models/error_log.py b/fba/models/error_log.py index 8e3b693..bfa27f2 100644 --- a/fba/models/error_log.py +++ b/fba/models/error_log.py @@ -17,30 +17,18 @@ import logging import json import time -import validators - from fba import database from fba.helpers import config +from fba.helpers import domain as domain_helper logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def add(domain: str, error: dict): logger.debug("domain='%s',error[]='%s' - CALLED!", domain, type(error)) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif config.get("write_error_log").lower() != "true": + domain_helper.raise_on(domain) + if config.get("write_error_log").lower() != "true": logger.debug("Writing to error_log is disabled in configuruation file - EXIT!") return diff --git a/fba/models/instances.py b/fba/models/instances.py index 09f1f82..7a4924d 100644 --- a/fba/models/instances.py +++ b/fba/models/instances.py @@ -27,6 +27,7 @@ from fba import utils from fba.helpers import blacklist from fba.helpers import cache from fba.helpers import config +from fba.helpers import domain as domain_helper from fba.http import federation from fba.http import network @@ -62,23 +63,12 @@ _pending = { } def _set_data(key: str, domain: str, value: any): - logger.debug(f"key='{key}',domain='{domain}',value[]='{type(value)}' - CALLED!") + logger.debug("key='%s',domain='%s',value[]='%s' - CALLED!", key, domain, type(value)) + domain_helper.raise_on(domain) if not isinstance(key, str): raise ValueError("Parameter key[]='{type(key)}' is not 'str'") elif key == "": raise ValueError("Parameter 'key' is empty") - elif not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") elif not key in _pending: raise ValueError(f"key='{key}' not found in _pending") elif not utils.is_primitive(value): @@ -91,59 +81,37 @@ def _set_data(key: str, domain: str, value: any): def has_pending(domain: str) -> bool: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) has = False for key in _pending: - logger.debug(f"key='{key}',domain='{domain}',_pending[key]()='{len(_pending[key])}'") + logger.debug("key='%s',domain='%s',_pending[key]()=%d", key, domain, len(_pending[key])) if domain in _pending[key]: has = True break - logger.debug(f"has='{has}' - EXIT!") + logger.debug("has='%s' - EXIT!", has) return has def update_data(domain: str): logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not has_pending(domain): + domain_helper.raise_on(domain) + if not has_pending(domain): raise Exception(f"domain='{domain}' has no pending instance data, but function invoked") elif not is_registered(domain): raise Exception(f"domain='{domain}' cannot be updated while not being registered") - logger.debug(f"Updating instance data for domain='{domain}' ...") + logger.debug("Updating instance data for domain='%s' ...", domain) sql_string = "" fields = list() for key in _pending: - logger.debug("key:", key) + logger.debug("Checking key='%s',domain='%s'", key, domain) if domain in _pending[key]: - logger.debug(f"Adding '{_pending[key][domain]}' for key='{key}' ...") + logger.debug("Adding '%s' for key='%s' ...", _pending[key][domain], key) fields.append(_pending[key][domain]) sql_string += f" {key} = ?," - logger.debug(f"sql_string()={len(sql_string)}") + logger.debug("sql_string()=%d", len(sql_string)) if sql_string == "": raise ValueError(f"No fields have been set, but method invoked, domain='{domain}'") @@ -153,43 +121,32 @@ def update_data(domain: str): # For WHERE statement fields.append(domain) - logger.debug(f"sql_string='{sql_string}',fields()={len(fields)}") + logger.debug("sql_string='%s',fields()=%d", sql_string, len(fields)) sql_string = "UPDATE instances SET" + sql_string + " last_updated = ? WHERE domain = ? LIMIT 1" - logger.debug("sql_string:", sql_string) - logger.debug("Executing SQL:", sql_string) + logger.debug("Executing SQL: '%s'", sql_string) database.cursor.execute(sql_string, fields) - logger.debug(f"Success! (rowcount={database.cursor.rowcount })") + logger.debug("rowcount=%d", database.cursor.rowcount) if database.cursor.rowcount == 0: raise Exception(f"Did not update any rows: domain='{domain}',fields()={len(fields)}") logger.debug("Invoking commit() ...") database.connection.commit() - logger.debug(f"Deleting _pending for domain='{domain}'") + logger.debug("Deleting _pending for domain='%s'", domain) for key in _pending: - logger.debug(f"domain='{domain}',key='{key}'") + logger.debug("domain='%s',key='%s'", domain, key) if domain in _pending[key]: + logger.debug("Deleting key='%s',domain='%s' ...", key, domain) del _pending[key][domain] logger.debug("EXIT!") def add(domain: str, origin: str, command: str, path: str = None, software: str = None): - logger.debug(f"domain='{domain}',origin='{origin}',command='{command}',path='{path}',software='{software}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(origin, str) and origin is not None: + logger.debug("domain='%s',origin='%s',command='%s',path='%s',software='%s' - CALLED!", domain, origin, command, path, software) + domain_helper.raise_on(domain) + if not isinstance(origin, str) and origin is not None: raise ValueError(f"origin[]='{type(origin)}' is not 'str'") elif origin == "": raise ValueError("Parameter 'origin' is empty") @@ -197,8 +154,6 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str raise ValueError(f"command[]='{type(command)}' is not 'str'") elif command == "": raise ValueError("Parameter 'command' is empty") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"Bad domain name='{domain}'") elif not isinstance(path, str) and path is not None: raise ValueError(f"path[]='{type(path)}' is not 'str'") elif path == "": @@ -207,20 +162,16 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str raise ValueError(f"software[]='{type(software)}' is not 'str'") elif software == "": raise ValueError("Parameter 'software' is empty") - elif domain.endswith(".arpa"): - raise ValueError(f"Please don't crawl .arpa domains: domain='{domain}'") elif origin is not None and not validators.domain(origin.split("/")[0]): raise ValueError(f"Bad origin name='{origin}'") elif blacklist.is_blacklisted(domain): raise Exception(f"domain='{domain}' is blacklisted, but method invoked") elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (software == "lemmy" and domain.find("/c/") > 0): raise Exception(f"domain='{domain}' is a single user") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") if software is None: try: - logger.debug("domain,origin,command,path:", domain, origin, command, path) + logger.debug("domain='%s',origin='%s',command='%s',path='%s'", domain, origin, command, path) software = federation.determine_software(domain, path) except network.exceptions as exception: logger.warning("Exception '%s' during determining software type, domain='%s'", type(exception), domain) @@ -233,7 +184,7 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str logger.warning("domain='%s' already registered after cutting off user part. - EXIT!", domain) return - logger.info("Adding instance domain='%s' (origin='%s',software='%s')", domain, origin, software) + logger.info("Adding instance domain='%s',origin='%s',software='%s',command='%s'", domain, origin, software, command) database.cursor.execute( "INSERT INTO instances (domain, origin, command, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)", ( @@ -246,96 +197,64 @@ def add(domain: str, origin: str, command: str, path: str = None, software: str ), ) - logger.debug(f"Marking domain='{domain}' as registered.") + logger.debug("Marking domain='%s' as registered.", domain) cache.set_sub_key("is_registered", domain, True) if has_pending(domain): - logger.debug(f"domain='{domain}' has pending nodeinfo being updated ...") + logger.debug("domain='%s' has pending nodeinfo being updated ...", domain) update_data(domain) logger.debug("EXIT!") def set_last_nodeinfo(domain: str): logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) logger.debug("Updating last_nodeinfo for domain:", domain) _set_data("last_nodeinfo", domain, time.time()) # Running pending updated - logger.debug(f"Invoking update_data({domain}) ...") + logger.debug("Invoking update_data(%s) ...", domain) update_data(domain) logger.debug("EXIT!") def set_last_error(domain: str, error: dict): - logger.debug("domain,error[]:", domain, type(error)) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - - logger.debug("BEFORE error[]:", type(error)) + logger.debug("domain='%s',error[]='%s' - CALLED!", domain, type(error)) + domain_helper.raise_on(domain) + + logger.debug("error[]='%s' - BEFORE!", type(error)) if isinstance(error, (BaseException, json.decoder.JSONDecodeError)): error = f"error[{type(error)}]='{str(error)}'" - logger.debug("AFTER error[]:", type(error)) + logger.debug("error[]='%s' - AFTER!", type(error)) if isinstance(error, str): - logger.debug(f"Setting last_error_details='{error}'") + logger.debug("Setting last_error_details='%s' (str)", error) _set_data("last_status_code" , domain, 999) _set_data("last_error_details", domain, error if error != "" else None) elif isinstance(error, requests.models.Response): - logger.debug(f"Setting last_error_details='{error.reason}'") + logger.debug("Setting last_error_details='%s' (Response)", error.reason) _set_data("last_status_code" , domain, error.status_code) _set_data("last_error_details", domain, error.reason if error.reason != "" else None) elif not isinstance(error, dict): raise KeyError(f"Cannot handle keys in error[{type(error)}]='{error}'") elif "status_code" in error and "error_message" in error: - logger.debug(f"Setting last_error_details='{error['error_message']}'") + logger.debug("Setting last_error_details='%s' (error_message)", error['error_message']) _set_data("last_status_code" , domain, error["status_code"]) _set_data("last_error_details", domain, error["error_message"] if error["error_message"] != "" else None) elif "json" in error and "error" in error["json"]: + logger.debug("Setting last_error_details='%s' (json,error)", error["json"]["error"]) _set_data("last_status_code" , domain, error["status_code"]) _set_data("last_error_details", domain, error["json"]["error"] if error["json"]["error"] != "" else None) - logger.debug(f"Invoking error_log.add(domain='{domain}',error[]='{type(error)}'") + logger.debug("Invoking error_log.add(domain='%s',error[]='%s'", domain, type(error)) error_log.add(domain, error) logger.debug("EXIT!") def is_registered(domain: str) -> bool: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) if not cache.key_exists("is_registered"): @@ -348,24 +267,13 @@ def is_registered(domain: str) -> bool: # Is cache found? registered = cache.sub_key_exists("is_registered", domain) - logger.debug(f"registered='{registered}' - EXIT!") + logger.debug("registered='%s' - EXIT!", registered) return registered def is_recent(domain: str) -> bool: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not is_registered(domain): + domain_helper.raise_on(domain) + if not is_registered(domain): logger.debug(f"domain='{domain}' is not registered, returning False - EXIT!") return False @@ -375,47 +283,37 @@ def is_recent(domain: str) -> bool: # Fetch row fetched = database.cursor.fetchone()[0] - logger.debug(f"fetched[{type(fetched)}]='{fetched}'") + logger.debug("fetched[%s]='%s'", type(fetched), fetched) recently = isinstance(fetched, float) and time.time() - fetched <= config.get("recheck_instance") - logger.debug(f"recently='{recently}' - EXIT!") + logger.debug("recently='%s' - EXIT!", recently) return recently def deobscure(char: str, domain: str, blocked_hash: str = None) -> tuple: - logger.debug(f"char='{char}',domain='{domain}',blocked_hash='{blocked_hash}' - CALLED!") + logger.debug("char='%s',domain='%s',blocked_hash='%s' - CALLED!", char, domain, blocked_hash) + domain_helper.raise_on(domain) if not isinstance(char, str): raise ValueError(f"Parameter char[]='{type(char)}' is not 'str'") elif char == "": raise ValueError("Parameter 'char' is empty") - elif not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") elif not isinstance(blocked_hash, str) and blocked_hash is not None: raise ValueError(f"Parameter blocked_hash[]='{type(blocked_hash)}' is not 'str'") + logger.debug("blocked_hash[]='%s'", type(blocked_hash)) if isinstance(blocked_hash, str): - logger.debug(f"Looking up blocked_hash='{blocked_hash}' ...") + logger.debug("Looking up blocked_hash='%s',domain='%s' ...", blocked_hash, domain) database.cursor.execute( - "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? LIMIT 1", [blocked_hash] + "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? OR domain LIKE ? LIMIT 1", [blocked_hash, domain.replace("_")] ) row = database.cursor.fetchone() logger.debug("row[]='%s'", type(row)) if row is None: - logger.debug(f"blocked_hash='{blocked_hash}' not found, trying domain='{domain}' ...") + logger.debug("blocked_hash='%s' not found, trying domain='%s' ...", blocked_hash, domain) return deobscure(char, domain) else: - logger.debug(f"Looking up domain='{domain}' ...") + logger.debug("Looking up domain='%s' ...", domain) database.cursor.execute( "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [domain.replace(char, "_")] ) @@ -423,23 +321,12 @@ def deobscure(char: str, domain: str, blocked_hash: str = None) -> tuple: row = database.cursor.fetchone() logger.debug("row[]='%s'", type(row)) - logger.debug(f"row[]='{type(row)}' - EXIT!") + logger.debug("row[]='%s' - EXIT!", type(row)) return row def set_last_blocked(domain: str): logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) # Set timestamp _set_data("last_blocked", domain, time.time()) @@ -447,18 +334,7 @@ def set_last_blocked(domain: str): def set_last_instance_fetch(domain: str): logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) # Set timestamp _set_data("last_instance_fetch", domain, time.time()) @@ -466,19 +342,8 @@ def set_last_instance_fetch(domain: str): def set_total_peers(domain: str, peers: list): logger.debug(f"domain='{domain}',peers()={len(peers)} - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(peers, list): + domain_helper.raise_on(domain) + if not isinstance(peers, list): raise ValueError(f"Parameter peers[]='{type(peers)}' is not 'list'") # Set timestamp @@ -487,19 +352,8 @@ def set_total_peers(domain: str, peers: list): def set_nodeinfo_url(domain: str, url: str): logger.debug(f"domain='{domain}',url='{url}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(url, str): + domain_helper.raise_on(domain) + if not isinstance(url, str): raise ValueError("Parameter url[]='{type(url)}' is not 'list'") elif url == "": raise ValueError("Parameter 'url' is empty") @@ -510,19 +364,8 @@ def set_nodeinfo_url(domain: str, url: str): def set_detection_mode(domain: str, mode: str): logger.debug(f"domain='{domain}',mode='{mode}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(mode, str): + domain_helper.raise_on(domain) + if not isinstance(mode, str): raise ValueError("Parameter mode[]='{type(mode)}' is not 'list'") elif mode == "": raise ValueError("Parameter 'mode' is empty") diff --git a/fba/networks/friendica.py b/fba/networks/friendica.py index 39b81d3..9441d68 100644 --- a/fba/networks/friendica.py +++ b/fba/networks/friendica.py @@ -17,11 +17,11 @@ import logging import bs4 -import validators from fba import utils from fba.helpers import config +from fba.helpers import domain as domain_helper from fba.helpers import tidyup from fba.http import network @@ -33,18 +33,7 @@ logger = logging.getLogger(__name__) def fetch_blocks(domain: str) -> dict: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) blocklist = list() block_tag = None diff --git a/fba/networks/lemmy.py b/fba/networks/lemmy.py index 1b8c6d2..7a415a7 100644 --- a/fba/networks/lemmy.py +++ b/fba/networks/lemmy.py @@ -18,13 +18,13 @@ import inspect import logging import bs4 -import validators from fba import csrf from fba import database from fba import utils from fba.helpers import config +from fba.helpers import domain as domain_helper from fba.helpers import tidyup from fba.http import federation @@ -38,18 +38,7 @@ logger = logging.getLogger(__name__) def fetch_peers(domain: str) -> list: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) peers = list() @@ -97,19 +86,8 @@ def fetch_peers(domain: str) -> list: def fetch_blocks(domain: str, origin: str, nodeinfo_url: str): logger.debug(f"domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(origin, str) and origin is not None: + domain_helper.raise_on(domain) + if not isinstance(origin, str) and origin is not None: raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'") elif origin == "": raise ValueError("Parameter 'origin' is empty") diff --git a/fba/networks/mastodon.py b/fba/networks/mastodon.py index ce662da..498b0f2 100644 --- a/fba/networks/mastodon.py +++ b/fba/networks/mastodon.py @@ -18,7 +18,6 @@ import inspect import logging import bs4 -import validators from fba import csrf from fba import database @@ -26,6 +25,7 @@ from fba import utils from fba.helpers import blacklist from fba.helpers import config +from fba.helpers import domain as domain_helper from fba.helpers import tidyup from fba.http import network @@ -65,18 +65,7 @@ language_mapping = { def fetch_blocks_from_about(domain: str) -> dict: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) logger.debug("Fetching mastodon blocks from domain:", domain) doc = None @@ -144,19 +133,8 @@ def fetch_blocks_from_about(domain: str) -> dict: def fetch_blocks(domain: str, origin: str, nodeinfo_url: str): logger.debug(f"domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(origin, str) and origin is not None: + domain_helper.raise_on(domain) + if not isinstance(origin, str) and origin is not None: raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'") elif origin == "": raise ValueError("Parameter 'origin' is empty") diff --git a/fba/networks/misskey.py b/fba/networks/misskey.py index 1d6fbb4..cd1097a 100644 --- a/fba/networks/misskey.py +++ b/fba/networks/misskey.py @@ -16,13 +16,13 @@ import json import logging -import validators from fba import csrf from fba import utils from fba.helpers import config from fba.helpers import dicts +from fba.helpers import domain as domain_helper from fba.helpers import tidyup from fba.http import network @@ -34,18 +34,7 @@ logger = logging.getLogger(__name__) def fetch_peers(domain: str) -> list: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) logger.debug("domain='%s' is misskey, sending API POST request ...", domain) peers = list() @@ -139,18 +128,7 @@ def fetch_peers(domain: str) -> list: def fetch_blocks(domain: str) -> dict: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) logger.debug("Fetching misskey blocks from domain='%s'", domain) blocklist = { diff --git a/fba/networks/peertube.py b/fba/networks/peertube.py index f43a798..b44f748 100644 --- a/fba/networks/peertube.py +++ b/fba/networks/peertube.py @@ -16,11 +16,10 @@ import logging -import validators - from fba import csrf from fba.helpers import config +from fba.helpers import domain as domain_helper from fba.http import network @@ -31,18 +30,7 @@ logger = logging.getLogger(__name__) def fetch_peers(domain: str) -> list: logger.debug(f"domain({len(domain)})='{domain}',software='peertube' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) logger.debug(f"domain='{domain}' is a PeerTube, fetching JSON ...") peers = list() diff --git a/fba/networks/pleroma.py b/fba/networks/pleroma.py index 7899906..9c745a8 100644 --- a/fba/networks/pleroma.py +++ b/fba/networks/pleroma.py @@ -18,13 +18,13 @@ import inspect import logging import bs4 -import validators from fba import database from fba import utils from fba.helpers import blacklist from fba.helpers import config +from fba.helpers import domain as domain_helper from fba.helpers import tidyup from fba.http import federation @@ -44,19 +44,8 @@ language_mapping = { def fetch_blocks(domain: str, origin: str, nodeinfo_url: str): logger.debug(f"domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(origin, str) and origin is not None: + domain_helper.raise_on(domain) + if not isinstance(origin, str) and origin is not None: raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'") elif origin == "": raise ValueError("Parameter 'origin' is empty") @@ -488,18 +477,7 @@ def fetch_blocks(domain: str, origin: str, nodeinfo_url: str): def fetch_blocks_from_about(domain: str) -> dict: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) logger.debug(f"Fetching mastodon blocks from domain='{domain}'") doc = None diff --git a/fba/utils.py b/fba/utils.py index d98a8e1..fcc1242 100644 --- a/fba/utils.py +++ b/fba/utils.py @@ -24,6 +24,7 @@ import validators from fba.helpers import blacklist from fba.helpers import cookies +from fba.helpers import domain as domain_helper from fba.helpers import tidyup from fba.http import federation @@ -42,18 +43,7 @@ def is_primitive(var: any) -> bool: def get_hash(domain: str) -> str: logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain) - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") + domain_helper.raise_on(domain) return hashlib.sha256(domain.encode("utf-8")).hexdigest() @@ -83,29 +73,9 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon def process_domain(domain: str, blocker: str, command: str) -> bool: logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!") - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") - elif not validators.domain(domain.split("/")[0]): - raise ValueError(f"domain='{domain}' is not a valid domain") - elif domain.endswith(".arpa"): - raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!") - elif domain.endswith(".tld"): - raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!") - elif not isinstance(blocker, str): - raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'") - elif blocker == "": - raise ValueError("Parameter 'blocker' is empty") - elif not validators.domain(blocker.split("/")[0]): - raise ValueError(f"blocker='{blocker}' is not a valid domain") - elif blocker.endswith(".arpa"): - raise ValueError(f"blocker='{blocker}' is a domain for reversed IP addresses, please don't crawl them!") - elif blocker.endswith(".tld"): - raise ValueError(f"blocker='{blocker}' is a fake domain, please don't crawl them!") - elif not isinstance(command, str): + domain_helper.raise_on(domain) + domain_helper.raise_on(blocker) + if not isinstance(command, str): raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'") elif command == "": raise ValueError("Parameter 'command' is empty") @@ -186,14 +156,14 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list: def is_domain_wanted (domain: str) -> bool: logger.debug("domain='%s' - CALLED!", domain) - wanted = True + wanted = True if not isinstance(domain, str): raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'") elif domain == "": raise ValueError("Parameter 'domain' is empty") elif domain.lower() != domain: - raise ValueError(f"Parameter domain='{domain}' must be all lower-case") + wanted = False elif not validators.domain(domain.split("/")[0]): logger.debug("domain='%s' is not a valid domain name - settings False ...", domain) wanted = False