]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Wed, 26 Jul 2023 10:08:05 +0000 (12:08 +0200)
committerRoland Häder <roland@mxchange.org>
Wed, 26 Jul 2023 10:08:05 +0000 (12:08 +0200)
- moved/renamed utils.is_domain_wanted() to domain_helper.is_wanted()
- removed no longer used imports
- tpzo fixed: sof(t)ware

13 files changed:
daemon.py
fba/commands.py
fba/helpers/domain.py
fba/helpers/processing.py
fba/helpers/software.py
fba/helpers/version.py
fba/http/federation.py
fba/models/instances.py
fba/networks/friendica.py
fba/networks/lemmy.py
fba/networks/misskey.py
fba/networks/pleroma.py
fba/utils.py

index 4ae10a49e81fbb6080969f558790ed4878a8876e..bfede5053fd4d0b566d77089092de532ee12a3cc 100755 (executable)
--- a/daemon.py
+++ b/daemon.py
@@ -22,7 +22,6 @@ import re
 from datetime import datetime
 from email.utils import format_datetime
 from pathlib import Path
-from urllib.parse import urlparse
 
 import fastapi
 from fastapi import Request, HTTPException, Query
@@ -39,6 +38,7 @@ from fba import utils
 
 from fba.helpers import blacklist
 from fba.helpers import config
+from fba.helpers import domain as domain_helper
 from fba.helpers import json as json_helper
 from fba.helpers import tidyup
 
@@ -140,7 +140,7 @@ def api_index(request: Request, mode: str, value: str, amount: int):
         )
     elif mode in ["domain", "reverse"]:
         domain = tidyup.domain(value)
-        if not utils.is_domain_wanted(domain):
+        if not domain_helper.is_wanted(domain):
             raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
 
         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
@@ -220,7 +220,7 @@ def api_domain(domain: str):
     # Tidy up domain name
     domain = tidyup.domain(domain).encode("idna").decode("utf-8")
 
-    if not utils.is_domain_wanted(domain):
+    if not domain_helper.is_wanted(domain):
         raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
 
     # Fetch domain data
@@ -386,7 +386,7 @@ def list_domains(request: Request, mode: str, value: str, amount: int = config.g
 def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
     if mode == "block_level" and not blocks.valid(value, "block_level"):
         raise HTTPException(status_code=500, detail="Invalid block level provided")
-    elif mode in ["domain", "reverse"] and not utils.is_domain_wanted(value):
+    elif mode in ["domain", "reverse"] and not domain_helper.is_wanted(value):
         raise HTTPException(status_code=500, detail="Invalid or blocked domain specified")
 
     response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}")
@@ -422,7 +422,7 @@ def infos(request: Request, domain: str):
     # Tidy up domain name
     domain = tidyup.domain(domain).encode("idna").decode("utf-8")
 
-    if not utils.is_domain_wanted(domain):
+    if not domain_helper.is_wanted(domain):
         raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
 
     response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/domain.json?domain={domain}")
index 1efe1097b34239044667c6df1854ff7b02a87abf..cd7deb975a24282b5b1807b943babf4a70bb43d7 100644 (file)
@@ -37,6 +37,7 @@ from fba.helpers import blacklist
 from fba.helpers import config
 from fba.helpers import cookies
 from fba.helpers import dicts as dict_helper
+from fba.helpers import domain as domain_helper
 from fba.helpers import locking
 from fba.helpers import processing
 from fba.helpers import software as software_helper
@@ -153,7 +154,7 @@ def fetch_pixelfed_api(args: argparse.Namespace) -> int:
             domain = row["domain"].encode("idna").decode("utf-8")
             logger.debug("domain='%s' - AFTER!", domain)
 
-            if not utils.is_domain_wanted(domain):
+            if not domain_helper.is_wanted(domain):
                 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
                 continue
             elif instances.is_registered(domain):
@@ -224,7 +225,7 @@ def fetch_bkali(args: argparse.Namespace) -> int:
             elif entry["domain"] == "":
                 logger.debug("entry[domain] is empty - SKIPPED!")
                 continue
-            elif not utils.is_domain_wanted(entry["domain"]):
+            elif not domain_helper.is_wanted(entry["domain"]):
                 logger.debug("entry[domain]='%s' is not wanted - SKIPPED!", entry["domain"])
                 continue
             elif instances.is_registered(entry["domain"]):
@@ -314,7 +315,7 @@ def fetch_blocks(args: argparse.Namespace) -> int:
         elif nodeinfo_url is None or nodeinfo_url == "":
             logger.debug("blocker='%s',software='%s' has empty nodeinfo_url", blocker, software)
             continue
-        elif not utils.is_domain_wanted(blocker):
+        elif not domain_helper.is_wanted(blocker):
             logger.debug("blocker='%s' is not wanted - SKIPPED!", blocker)
             continue
 
@@ -326,18 +327,23 @@ def fetch_blocks(args: argparse.Namespace) -> int:
         if software == "pleroma":
             logger.info("blocker='%s',software='%s'", blocker, software)
             blocking = pleroma.fetch_blocks(blocker, nodeinfo_url)
+            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
         elif software == "mastodon":
             logger.info("blocker='%s',software='%s'", blocker, software)
             blocking = mastodon.fetch_blocks(blocker, nodeinfo_url)
+            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
         elif software == "lemmy":
             logger.info("blocker='%s',software='%s'", blocker, software)
             blocking = lemmy.fetch_blocks(blocker, nodeinfo_url)
+            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
         elif software == "friendica":
             logger.info("blocker='%s',software='%s'", blocker, software)
             blocking = friendica.fetch_blocks(blocker)
+            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
         elif software == "misskey":
             logger.info("blocker='%s',software='%s'", blocker, software)
             blocking = misskey.fetch_blocks(blocker)
+            logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
         else:
             logger.warning("Unknown software: blocker='%s',software='%s'", blocker, software)
 
@@ -412,7 +418,7 @@ def fetch_blocks(args: argparse.Namespace) -> int:
             block["blocked"] = block["blocked"].lstrip(".").encode("idna").decode("utf-8")
             logger.debug("block[blocked]='%s' - AFTER!", block["blocked"])
 
-            if not utils.is_domain_wanted(block["blocked"]):
+            if not domain_helper.is_wanted(block["blocked"]):
                 logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"])
                 continue
             elif block["block_level"] in ["accept", "accepted"]:
@@ -540,7 +546,7 @@ def fetch_observer(args: argparse.Namespace) -> int:
             domain = domain.encode("idna").decode("utf-8")
             logger.debug("domain='%s' - AFTER!", domain)
 
-            if not utils.is_domain_wanted(domain):
+            if not domain_helper.is_wanted(domain):
                 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
                 continue
             elif instances.is_registered(domain):
@@ -790,7 +796,7 @@ def fetch_fba_rss(args: argparse.Namespace) -> int:
             domain = domain.encode("idna").decode("utf-8")
             logger.debug("domain='%s' - AFTER!", domain)
 
-            if not utils.is_domain_wanted(domain):
+            if not domain_helper.is_wanted(domain):
                 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
                 continue
             elif domain in domains:
@@ -868,7 +874,7 @@ def fetch_fbabot_atom(args: argparse.Namespace) -> int:
                     domain = domain.encode("idna").decode("utf-8")
                     logger.debug("domain='%s' - AFTER!", domain)
 
-                    if not utils.is_domain_wanted(domain):
+                    if not domain_helper.is_wanted(domain):
                         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
                         continue
                     elif domain in domains:
@@ -945,7 +951,7 @@ def fetch_instances(args: argparse.Namespace) -> int:
         domain = row["domain"].encode("idna").decode("utf-8")
         logger.debug("domain='%s' - AFTER!", domain)
 
-        if not utils.is_domain_wanted(domain):
+        if not domain_helper.is_wanted(domain):
             logger.debug("Domain domain='%s' is not wanted - SKIPPED!", domain)
             continue
 
@@ -1181,7 +1187,7 @@ def fetch_txt(args: argparse.Namespace) -> int:
                 if domain == "":
                     logger.debug("domain is empty - SKIPPED!")
                     continue
-                elif not utils.is_domain_wanted(domain):
+                elif not domain_helper.is_wanted(domain):
                     logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
                     continue
                 elif instances.is_recent(domain):
@@ -1242,7 +1248,7 @@ def fetch_fedipact(args: argparse.Namespace) -> int:
             domain = domain.encode("idna").decode("utf-8")
             logger.debug("domain='%s' - AFTER!", domain)
 
-            if not utils.is_domain_wanted(domain):
+            if not domain_helper.is_wanted(domain):
                 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
                 continue
             elif instances.is_registered(domain):
@@ -1293,7 +1299,7 @@ def fetch_joinmobilizon(args: argparse.Namespace) -> int:
         if "host" not in row:
             logger.warning("row='%s' does not contain key 'host' - SKIPPED!", row)
             continue
-        elif not utils.is_domain_wanted(row["host"]):
+        elif not domain_helper.is_wanted(row["host"]):
             logger.debug("row[host]='%s' is not wanted - SKIPPED!", row["host"])
             continue
         elif instances.is_registered(row["host"]):
@@ -1341,7 +1347,7 @@ def fetch_joinmisskey(args: argparse.Namespace) -> int:
         if "url" not in row:
             logger.warning("row()=%d does not have element 'url' - SKIPPED!", len(row))
             continue
-        elif not utils.is_domain_wanted(row["url"]):
+        elif not domain_helper.is_wanted(row["url"]):
             logger.debug("row[url]='%s' is not wanted - SKIPPED!", row["url"])
             continue
         elif instances.is_registered(row["url"]):
@@ -1482,7 +1488,7 @@ def fetch_joinfediverse(args: argparse.Namespace) -> int:
         if block["blocked"] == "":
             logger.debug("block[blocked] is empty - SKIPPED!")
             continue
-        elif not utils.is_domain_wanted(block["blocked"]):
+        elif not domain_helper.is_wanted(block["blocked"]):
             logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"])
             continue
         elif instances.is_recent(block["blocked"]):
@@ -1505,7 +1511,7 @@ def fetch_joinfediverse(args: argparse.Namespace) -> int:
             if block["blocked"] == "":
                 logger.debug("block[blocked] is empty - SKIPPED!")
                 continue
-            elif not utils.is_domain_wanted(block["blocked"]):
+            elif not domain_helper.is_wanted(block["blocked"]):
                 logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"])
                 continue
 
@@ -1538,7 +1544,7 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
     logger.debug("Invoking locking.acquire() ...")
     locking.acquire()
 
-    if isinstance(args.domain, str) and args.domain != "" and utils.is_domain_wanted(args.domain):
+    if isinstance(args.domain, str) and args.domain != "" and domain_helper.is_wanted(args.domain):
         database.cursor.execute("SELECT domain, software, nodeinfo_url FROM instances WHERE has_obfuscation = 1 AND domain = ?", [args.domain])
     elif isinstance(args.software, str) and args.software != "" and validators.domain(args.software) == args.software:
         database.cursor.execute("SELECT domain, software, nodeinfo_url FROM instances WHERE has_obfuscation = 1 AND software = ?", [args.software])
@@ -1570,7 +1576,7 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
             logger.debug("domain='%s',software='%s'", row["domain"], row["software"])
             blocking = misskey.fetch_blocks(row["domain"])
         else:
-            logger.warning("Unknown sofware: domain='%s',software='%s'", row["domain"], row["software"])
+            logger.warning("Unknown software: domain='%s',software='%s'", row["domain"], row["software"])
 
         logger.debug("row[domain]='%s'", row["domain"])
         # chaos.social requires special care ...
@@ -1602,7 +1608,7 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
                 logger.debug("block='%s' is obfuscated.", block["blocked"])
                 obfuscated = obfuscated + 1
                 blocked = utils.deobfuscate(block["blocked"], row["domain"], block["hash"] if "hash" in block else None)
-            elif not utils.is_domain_wanted(block["blocked"]):
+            elif not domain_helper.is_wanted(block["blocked"]):
                 logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"])
                 continue
             elif blocks.is_instance_blocked(row["domain"], block["blocked"]):
@@ -1710,7 +1716,7 @@ def fetch_fedilist(args: argparse.Namespace) -> int:
         domain = domain.encode("idna").decode("utf-8")
         logger.debug("domain='%s' - AFTER!", domain)
 
-        if not utils.is_domain_wanted(domain):
+        if not domain_helper.is_wanted(domain):
             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
             continue
         elif (args.force is None or not args.force) and instances.is_registered(domain):
@@ -1836,7 +1842,7 @@ def fetch_instances_social(args: argparse.Namespace) -> int:
         domain = domain.encode("idna").decode("utf-8")
         logger.debug("domain='%s' - AFTER!", domain)
 
-        if not utils.is_domain_wanted(domain):
+        if not domain_helper.is_wanted(domain):
             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
             continue
         elif domain in domains:
@@ -1913,7 +1919,7 @@ def fetch_relays(args: argparse.Namespace) -> int:
                         continue
 
                     domain = str(domain)
-                    if not utils.is_domain_wanted(domain):
+                    if not domain_helper.is_wanted(domain):
                         logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
                         continue
 
@@ -1956,7 +1962,7 @@ def fetch_relays(args: argparse.Namespace) -> int:
                 components = urlparse(link["href"])
                 domain = components.netloc.lower()
 
-                if not utils.is_domain_wanted(domain):
+                if not domain_helper.is_wanted(domain):
                     logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
                     continue
 
index dd4e214c62ad543fea15e09f05358980594fac59..96aa189d4ef7e9c0610a6d86a280929f5ad40b84 100644 (file)
@@ -20,6 +20,10 @@ from urllib.parse import urlparse
 
 import validators
 
+from fba.helpers import blacklist
+
+from fba.models import instances
+
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
@@ -60,3 +64,38 @@ def is_in_url(domain: str, url: str) -> bool:
 
     logger.debug("is_found='%s' - EXIT!", is_found)
     return is_found
+
+def is_wanted(domain: str) -> bool:
+    logger.debug("domain='%s' - CALLED!", domain)
+
+    wanted = True
+    if not isinstance(domain, str):
+        raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
+    elif domain == "":
+        raise ValueError("Parameter 'domain' is empty")
+    elif domain.lower() != domain:
+        wanted = False
+    elif not validators.domain(domain.split("/")[0]):
+        logger.debug("domain='%s' is not a valid domain name - setting False ...", domain)
+        wanted = False
+    elif domain.endswith(".arpa"):
+        logger.debug("domain='%s' is a domain for reversed IP addresses - setting False ...", domain)
+        wanted = False
+    elif domain.endswith(".onion"):
+        logger.debug("domain='%s' is a TOR .onion domain - setting False ...", domain)
+        wanted = False
+    elif domain.endswith(".tld"):
+        logger.debug("domain='%s' is a fake domain - setting False ...", domain)
+        wanted = False
+    elif blacklist.is_blacklisted(domain):
+        logger.debug("domain='%s' is blacklisted - setting False ...", domain)
+        wanted = False
+    elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
+        logger.debug("domain='%s' is a single user", domain)
+        wanted = False
+    elif domain.find("/tag/") > 0:
+        logger.debug("domain='%s' is a tag", domain)
+        wanted = False
+
+    logger.debug("wanted='%s' - EXIT!", wanted)
+    return wanted
index 5ffbbfc3a53bee26125b2ce5308babea124379f9..87c313f7aa9262e743f4cbf994d056ebdcffb768 100644 (file)
@@ -46,7 +46,7 @@ def domain(name: str, blocker: str, command: str) -> bool:
         logger.debug("Flushing updates for blocker='%s' ...", blocker)
         instances.update_data(blocker)
 
-    if not utils.is_domain_wanted(name):
+    if not domain_helper.is_wanted(name):
         logger.debug("name='%s' is not wanted - SKIPPED!", name)
         return False
     elif instances.is_recent(name):
index 76286a1042c561e43b61dc9fd4e5bd6fc94dec68..a93b086d6f16fcb91496f43b09d6541cece725a4 100644 (file)
@@ -36,7 +36,7 @@ def alias(software: str) -> str:
         software = "pleroma"
     elif "radiant" in software:
         logger.debug("Setting radiant: software='%s'", software)
-        sofware = "radiant"
+        software = "radiant"
     elif software in ["hometown", "ecko", "fedibird" ] or "되는 마스토돈" in software or "mastodon" in software:
         logger.debug("Setting mastodon: software='%s'", software)
         software = "mastodon"
index 2ea6219617f820584055b46cbba728296484a860..bb8dc9aa614ce4d81e98d0007f1179e09196c149 100644 (file)
@@ -114,7 +114,7 @@ def strip_hosted_on(software: str) -> str:
     elif software == "":
         raise ValueError("Parameter 'software' is empty")
     elif "hosted on" not in software:
-        logger.warning("Cannot find 'hosted on' in sofware='%s'!", software)
+        logger.warning("Cannot find 'hosted on' in software='%s'!", software)
         return software
 
     end = software.find("hosted on ")
index 5e490fc81bcd1dfebba023340b8ed7ae2b3c5415..0395eb94d19b9619577febca5dbc4b9defa75652 100644 (file)
@@ -22,7 +22,6 @@ import requests
 import validators
 
 from fba import csrf
-from fba import utils
 
 from fba.helpers import config
 from fba.helpers import cookies
@@ -126,7 +125,7 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path:
         instance = instance.encode("idna").decode("utf-8")
         logger.debug("instance='%s' - AFTER!", instance)
 
-        if not utils.is_domain_wanted(instance):
+        if not domain_helper.is_wanted(instance):
             logger.debug("instance='%s' is not wanted - SKIPPED!", instance)
             continue
         elif instance.find("/profile/") > 0 or instance.find("/users/") > 0 or (instances.is_registered(instance.split("/")[0]) and instance.find("/c/") > 0):
@@ -417,7 +416,7 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict:
                     components = urlparse(url)
 
                 logger.debug("components.netloc[]='%s'", type(components.netloc))
-                if not utils.is_domain_wanted(components.netloc):
+                if not domain_helper.is_wanted(components.netloc):
                     logger.debug("components.netloc='%s' is not wanted - SKIPPED!", components.netloc)
                     continue
 
@@ -662,7 +661,7 @@ def find_domains(tag: bs4.element.Tag) -> list:
 
         logger.debug("domain='%s',reason='%s'", domain, reason)
 
-        if not utils.is_domain_wanted(domain):
+        if not domain_helper.is_wanted(domain):
             logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
             continue
         elif domain == "gab.com/.ai, develop.gab.com":
@@ -721,7 +720,7 @@ def add_peers(rows: dict) -> list:
                 raise ValueError(f"peer[]='{type(peer)}' is not supported,key='{key}'")
 
             logger.debug("peer[%s]='%s' - AFTER!", type(peer), peer)
-            if not utils.is_domain_wanted(peer):
+            if not domain_helper.is_wanted(peer):
                 logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
                 continue
 
index f27d90e41e9946b4718b335b24324ce838c6eb19..69b6f15c6047d0ac593c8be9752fb6e78af89db1 100644 (file)
@@ -421,7 +421,7 @@ def set_obfuscated_blocks(domain: str, obfuscated: int):
     domain_helper.raise_on(domain)
 
     if not isinstance(obfuscated, int):
-        raise ValueError(f"Parameter obfuscated[]='{type(blocks)}' is not of type 'int'")
+        raise ValueError(f"Parameter obfuscated[]='{type(obfuscated)}' is not of type 'int'")
     elif obfuscated < 0:
         raise ValueError(f"Parameter obfuscated={obfuscated} is not valid")
 
index 96c25088947353abd14b16be23098c31f3e6690d..b1d34fc14d627c59da878ff48283ba459377896c 100644 (file)
@@ -84,7 +84,7 @@ def fetch_blocks(domain: str) -> list:
         if blocked == "":
             logger.debug("line[]='%s' returned empty blocked domain - SKIPPED!", type(line))
             continue
-        elif not utils.is_domain_wanted(blocked):
+        elif not domain_helper.is_wanted(blocked):
             logger.debug("blocked='%s' is not wanted - SKIPPED!", domain)
             continue
 
@@ -92,7 +92,7 @@ def fetch_blocks(domain: str) -> list:
         blocked = utils.deobfuscate(blocked, domain)
 
         logger.debug("blocked[%s]='%s' - DEOBFUSCATED!", type(blocked), blocked)
-        if not utils.is_domain_wanted(blocked):
+        if not domain_helper.is_wanted(blocked):
             logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
             continue
 
index 42c4eb4f37b8e1182aa2a16c9941946f64e9846b..7b427fc91ca05db6a85238be3015bb2b76b41294 100644 (file)
@@ -20,7 +20,6 @@ import logging
 import bs4
 
 from fba import csrf
-from fba import utils
 
 from fba.helpers import config
 from fba.helpers import domain as domain_helper
@@ -154,7 +153,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                 logger.debug("Checking %d containers ...", len(containers))
                 for container in containers:
                     logger.debug("container[]='%s'", type(container))
-                    for header in container.find_all(["h2", "h3", "h4", "h5"]): 
+                    for header in container.find_all(["h2", "h3", "h4", "h5"]):
                         content = header
                         logger.debug("header[%s]='%s' - BEFORE!", type(header), header)
                         if header is not None:
@@ -210,7 +209,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                 if blocked == "":
                     logger.warning("blocked='%s' is empty after tidyup.domain() - SKIPPED!", tag.contents[0])
                     continue
-                elif not utils.is_domain_wanted(blocked):
+                elif not domain_helper.is_wanted(blocked):
                     logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
                     continue
 
@@ -271,7 +270,7 @@ def fetch_instances(domain: str, origin: str) -> list:
                         if peer == "":
                             logger.debug("peer is empty - SKIPPED!")
                             continue
-                        elif not utils.is_domain_wanted(peer):
+                        elif not domain_helper.is_wanted(peer):
                             logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
                             continue
                         elif peer in peers:
@@ -367,7 +366,7 @@ def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
                 if peer == "":
                     logger.debug("peer is empty - SKIPPED!")
                     continue
-                elif not utils.is_domain_wanted(peer):
+                elif not domain_helper.is_wanted(peer):
                     logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
                     continue
                 elif peer in peers:
index c2541113faca0f347395db0ead1bf1da73b482de..4227b2c9729e89478c9aa43e57b83cad2965f0a8 100644 (file)
@@ -18,7 +18,6 @@ import json
 import logging
 
 from fba import csrf
-from fba import utils
 
 from fba.helpers import config
 from fba.helpers import dicts as dict_helper
@@ -107,7 +106,7 @@ def fetch_peers(domain: str) -> list:
             elif not isinstance(row["host"], str):
                 logger.warning("row[host][]='%s' is not of type 'str' - SKIPPED!", type(row['host']))
                 continue
-            elif not utils.is_domain_wanted(row["host"]):
+            elif not domain_helper.is_wanted(row["host"]):
                 logger.debug("row[host]='%s' is not wanted, domain='%s' - SKIPPED!", row['host'], domain)
                 continue
             elif row["host"] in peers:
index fbe03dac65ef55dfec2d025402e2541440b668d7..320e3f576ec14b911043bcbc1b62e6c9358c623e 100644 (file)
@@ -130,7 +130,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                     if blocked == "":
                         logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s' - SKIPPED!", domain, block_level)
                         continue
-                    elif not utils.is_domain_wanted(blocked):
+                    elif not domain_helper.is_wanted(blocked):
                         logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
                         continue
 
@@ -138,7 +138,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                     blocked = utils.deobfuscate(blocked, domain)
 
                     logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
-                    if not utils.is_domain_wanted(blocked):
+                    if not domain_helper.is_wanted(blocked):
                         logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
                         continue
 
@@ -163,7 +163,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
             if blocked == "":
                 logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
                 continue
-            elif not utils.is_domain_wanted(blocked):
+            elif not domain_helper.is_wanted(blocked):
                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
                 continue
 
@@ -171,7 +171,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
             blocked = utils.deobfuscate(blocked, domain)
 
             logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
-            if not utils.is_domain_wanted(blocked):
+            if not domain_helper.is_wanted(blocked):
                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
                 continue
 
@@ -232,7 +232,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                 if blocked == "":
                     logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
                     continue
-                elif not utils.is_domain_wanted(blocked):
+                elif not domain_helper.is_wanted(blocked):
                     logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
                     continue
 
@@ -266,7 +266,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
             elif blocked == "":
                 logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
                 continue
-            elif not utils.is_domain_wanted(blocked):
+            elif not domain_helper.is_wanted(blocked):
                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
                 continue
 
@@ -274,7 +274,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
             blocked = utils.deobfuscate(blocked, domain)
 
             logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
-            if not utils.is_domain_wanted(blocked):
+            if not domain_helper.is_wanted(blocked):
                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
                 continue
 
@@ -304,7 +304,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                     block["blocked"] = utils.deobfuscate(block["blocked"], domain)
 
                     logger.debug("block[blocked]='%s' - DEOBFUSCATED!", block["blocked"])
-                    if not utils.is_domain_wanted(block["blocked"]):
+                    if not domain_helper.is_wanted(block["blocked"]):
                         logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"])
                         continue
 
index ee2376bebcf13e2963c4ac5aed8f70d3c322951d..44c831014cbafd3db123376cebba73a536e70f49 100644 (file)
@@ -20,9 +20,7 @@ from urllib.parse import urlparse
 
 import bs4
 import requests
-import validators
 
-from fba.helpers import blacklist
 from fba.helpers import config
 from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup
@@ -99,7 +97,7 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
         domain = domain.encode("idna").decode("utf-8")
         logger.debug("domain='%s' - AFTER!", domain)
 
-        if not is_domain_wanted(domain):
+        if not domain_helper.is_wanted(domain):
             logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
             continue
 
@@ -109,41 +107,6 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
     logger.debug("domains()=%d - EXIT!", len(domains))
     return domains
 
-def is_domain_wanted(domain: str) -> bool:
-    logger.debug("domain='%s' - CALLED!", domain)
-
-    wanted = True
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif domain.lower() != domain:
-        wanted = False
-    elif not validators.domain(domain.split("/")[0]):
-        logger.debug("domain='%s' is not a valid domain name - setting False ...", domain)
-        wanted = False
-    elif domain.endswith(".arpa"):
-        logger.debug("domain='%s' is a domain for reversed IP addresses - setting False ...", domain)
-        wanted = False
-    elif domain.endswith(".onion"):
-        logger.debug("domain='%s' is a TOR .onion domain - setting False ...", domain)
-        wanted = False
-    elif domain.endswith(".tld"):
-        logger.debug("domain='%s' is a fake domain - setting False ...", domain)
-        wanted = False
-    elif blacklist.is_blacklisted(domain):
-        logger.debug("domain='%s' is blacklisted - setting False ...", domain)
-        wanted = False
-    elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
-        logger.debug("domain='%s' is a single user", domain)
-        wanted = False
-    elif domain.find("/tag/") > 0:
-        logger.debug("domain='%s' is a tag", domain)
-        wanted = False
-
-    logger.debug("wanted='%s' - EXIT!", wanted)
-    return wanted
-
 def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
     logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
     domain_helper.raise_on(blocker)