From: Roland Häder Date: Wed, 26 Jul 2023 10:08:05 +0000 (+0200) Subject: Continued: X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=6e4c3c91dbad206794edd73cffe41308a70d7ba9;p=fba.git Continued: - moved/renamed utils.is_domain_wanted() to domain_helper.is_wanted() - removed no longer used imports - tpzo fixed: sof(t)ware --- diff --git a/daemon.py b/daemon.py index 4ae10a4..bfede50 100755 --- a/daemon.py +++ b/daemon.py @@ -22,7 +22,6 @@ import re from datetime import datetime from email.utils import format_datetime from pathlib import Path -from urllib.parse import urlparse import fastapi from fastapi import Request, HTTPException, Query @@ -39,6 +38,7 @@ from fba import utils from fba.helpers import blacklist from fba.helpers import config +from fba.helpers import domain as domain_helper from fba.helpers import json as json_helper from fba.helpers import tidyup @@ -140,7 +140,7 @@ def api_index(request: Request, mode: str, value: str, amount: int): ) elif mode in ["domain", "reverse"]: domain = tidyup.domain(value) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted") wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):]) @@ -220,7 +220,7 @@ def api_domain(domain: str): # Tidy up domain name domain = tidyup.domain(domain).encode("idna").decode("utf-8") - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted") # Fetch domain data @@ -386,7 +386,7 @@ def list_domains(request: Request, mode: str, value: str, amount: int = config.g def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")): if mode == "block_level" and not blocks.valid(value, "block_level"): raise HTTPException(status_code=500, detail="Invalid block level provided") - elif mode in ["domain", "reverse"] and not utils.is_domain_wanted(value): + elif mode in ["domain", "reverse"] and not domain_helper.is_wanted(value): raise HTTPException(status_code=500, detail="Invalid or blocked domain specified") response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}") @@ -422,7 +422,7 @@ def infos(request: Request, domain: str): # Tidy up domain name domain = tidyup.domain(domain).encode("idna").decode("utf-8") - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted") response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/domain.json?domain={domain}") diff --git a/fba/commands.py b/fba/commands.py index 1efe109..cd7deb9 100644 --- a/fba/commands.py +++ b/fba/commands.py @@ -37,6 +37,7 @@ from fba.helpers import blacklist from fba.helpers import config from fba.helpers import cookies from fba.helpers import dicts as dict_helper +from fba.helpers import domain as domain_helper from fba.helpers import locking from fba.helpers import processing from fba.helpers import software as software_helper @@ -153,7 +154,7 @@ def fetch_pixelfed_api(args: argparse.Namespace) -> int: domain = row["domain"].encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue elif instances.is_registered(domain): @@ -224,7 +225,7 @@ def fetch_bkali(args: argparse.Namespace) -> int: elif entry["domain"] == "": logger.debug("entry[domain] is empty - SKIPPED!") continue - elif not utils.is_domain_wanted(entry["domain"]): + elif not domain_helper.is_wanted(entry["domain"]): logger.debug("entry[domain]='%s' is not wanted - SKIPPED!", entry["domain"]) continue elif instances.is_registered(entry["domain"]): @@ -314,7 +315,7 @@ def fetch_blocks(args: argparse.Namespace) -> int: elif nodeinfo_url is None or nodeinfo_url == "": logger.debug("blocker='%s',software='%s' has empty nodeinfo_url", blocker, software) continue - elif not utils.is_domain_wanted(blocker): + elif not domain_helper.is_wanted(blocker): logger.debug("blocker='%s' is not wanted - SKIPPED!", blocker) continue @@ -326,18 +327,23 @@ def fetch_blocks(args: argparse.Namespace) -> int: if software == "pleroma": logger.info("blocker='%s',software='%s'", blocker, software) blocking = pleroma.fetch_blocks(blocker, nodeinfo_url) + logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software) elif software == "mastodon": logger.info("blocker='%s',software='%s'", blocker, software) blocking = mastodon.fetch_blocks(blocker, nodeinfo_url) + logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software) elif software == "lemmy": logger.info("blocker='%s',software='%s'", blocker, software) blocking = lemmy.fetch_blocks(blocker, nodeinfo_url) + logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software) elif software == "friendica": logger.info("blocker='%s',software='%s'", blocker, software) blocking = friendica.fetch_blocks(blocker) + logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software) elif software == "misskey": logger.info("blocker='%s',software='%s'", blocker, software) blocking = misskey.fetch_blocks(blocker) + logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software) else: logger.warning("Unknown software: blocker='%s',software='%s'", blocker, software) @@ -412,7 +418,7 @@ def fetch_blocks(args: argparse.Namespace) -> int: block["blocked"] = block["blocked"].lstrip(".").encode("idna").decode("utf-8") logger.debug("block[blocked]='%s' - AFTER!", block["blocked"]) - if not utils.is_domain_wanted(block["blocked"]): + if not domain_helper.is_wanted(block["blocked"]): logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"]) continue elif block["block_level"] in ["accept", "accepted"]: @@ -540,7 +546,7 @@ def fetch_observer(args: argparse.Namespace) -> int: domain = domain.encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue elif instances.is_registered(domain): @@ -790,7 +796,7 @@ def fetch_fba_rss(args: argparse.Namespace) -> int: domain = domain.encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue elif domain in domains: @@ -868,7 +874,7 @@ def fetch_fbabot_atom(args: argparse.Namespace) -> int: domain = domain.encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue elif domain in domains: @@ -945,7 +951,7 @@ def fetch_instances(args: argparse.Namespace) -> int: domain = row["domain"].encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("Domain domain='%s' is not wanted - SKIPPED!", domain) continue @@ -1181,7 +1187,7 @@ def fetch_txt(args: argparse.Namespace) -> int: if domain == "": logger.debug("domain is empty - SKIPPED!") continue - elif not utils.is_domain_wanted(domain): + elif not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue elif instances.is_recent(domain): @@ -1242,7 +1248,7 @@ def fetch_fedipact(args: argparse.Namespace) -> int: domain = domain.encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue elif instances.is_registered(domain): @@ -1293,7 +1299,7 @@ def fetch_joinmobilizon(args: argparse.Namespace) -> int: if "host" not in row: logger.warning("row='%s' does not contain key 'host' - SKIPPED!", row) continue - elif not utils.is_domain_wanted(row["host"]): + elif not domain_helper.is_wanted(row["host"]): logger.debug("row[host]='%s' is not wanted - SKIPPED!", row["host"]) continue elif instances.is_registered(row["host"]): @@ -1341,7 +1347,7 @@ def fetch_joinmisskey(args: argparse.Namespace) -> int: if "url" not in row: logger.warning("row()=%d does not have element 'url' - SKIPPED!", len(row)) continue - elif not utils.is_domain_wanted(row["url"]): + elif not domain_helper.is_wanted(row["url"]): logger.debug("row[url]='%s' is not wanted - SKIPPED!", row["url"]) continue elif instances.is_registered(row["url"]): @@ -1482,7 +1488,7 @@ def fetch_joinfediverse(args: argparse.Namespace) -> int: if block["blocked"] == "": logger.debug("block[blocked] is empty - SKIPPED!") continue - elif not utils.is_domain_wanted(block["blocked"]): + elif not domain_helper.is_wanted(block["blocked"]): logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"]) continue elif instances.is_recent(block["blocked"]): @@ -1505,7 +1511,7 @@ def fetch_joinfediverse(args: argparse.Namespace) -> int: if block["blocked"] == "": logger.debug("block[blocked] is empty - SKIPPED!") continue - elif not utils.is_domain_wanted(block["blocked"]): + elif not domain_helper.is_wanted(block["blocked"]): logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"]) continue @@ -1538,7 +1544,7 @@ def recheck_obfuscation(args: argparse.Namespace) -> int: logger.debug("Invoking locking.acquire() ...") locking.acquire() - if isinstance(args.domain, str) and args.domain != "" and utils.is_domain_wanted(args.domain): + if isinstance(args.domain, str) and args.domain != "" and domain_helper.is_wanted(args.domain): database.cursor.execute("SELECT domain, software, nodeinfo_url FROM instances WHERE has_obfuscation = 1 AND domain = ?", [args.domain]) elif isinstance(args.software, str) and args.software != "" and validators.domain(args.software) == args.software: database.cursor.execute("SELECT domain, software, nodeinfo_url FROM instances WHERE has_obfuscation = 1 AND software = ?", [args.software]) @@ -1570,7 +1576,7 @@ def recheck_obfuscation(args: argparse.Namespace) -> int: logger.debug("domain='%s',software='%s'", row["domain"], row["software"]) blocking = misskey.fetch_blocks(row["domain"]) else: - logger.warning("Unknown sofware: domain='%s',software='%s'", row["domain"], row["software"]) + logger.warning("Unknown software: domain='%s',software='%s'", row["domain"], row["software"]) logger.debug("row[domain]='%s'", row["domain"]) # chaos.social requires special care ... @@ -1602,7 +1608,7 @@ def recheck_obfuscation(args: argparse.Namespace) -> int: logger.debug("block='%s' is obfuscated.", block["blocked"]) obfuscated = obfuscated + 1 blocked = utils.deobfuscate(block["blocked"], row["domain"], block["hash"] if "hash" in block else None) - elif not utils.is_domain_wanted(block["blocked"]): + elif not domain_helper.is_wanted(block["blocked"]): logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"]) continue elif blocks.is_instance_blocked(row["domain"], block["blocked"]): @@ -1710,7 +1716,7 @@ def fetch_fedilist(args: argparse.Namespace) -> int: domain = domain.encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue elif (args.force is None or not args.force) and instances.is_registered(domain): @@ -1836,7 +1842,7 @@ def fetch_instances_social(args: argparse.Namespace) -> int: domain = domain.encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue elif domain in domains: @@ -1913,7 +1919,7 @@ def fetch_relays(args: argparse.Namespace) -> int: continue domain = str(domain) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue @@ -1956,7 +1962,7 @@ def fetch_relays(args: argparse.Namespace) -> int: components = urlparse(link["href"]) domain = components.netloc.lower() - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue diff --git a/fba/helpers/domain.py b/fba/helpers/domain.py index dd4e214..96aa189 100644 --- a/fba/helpers/domain.py +++ b/fba/helpers/domain.py @@ -20,6 +20,10 @@ from urllib.parse import urlparse import validators +from fba.helpers import blacklist + +from fba.models import instances + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -60,3 +64,38 @@ def is_in_url(domain: str, url: str) -> bool: logger.debug("is_found='%s' - EXIT!", is_found) return is_found + +def is_wanted(domain: str) -> bool: + logger.debug("domain='%s' - CALLED!", domain) + + wanted = True + if not isinstance(domain, str): + raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'") + elif domain == "": + raise ValueError("Parameter 'domain' is empty") + elif domain.lower() != domain: + wanted = False + elif not validators.domain(domain.split("/")[0]): + logger.debug("domain='%s' is not a valid domain name - setting False ...", domain) + wanted = False + elif domain.endswith(".arpa"): + logger.debug("domain='%s' is a domain for reversed IP addresses - setting False ...", domain) + wanted = False + elif domain.endswith(".onion"): + logger.debug("domain='%s' is a TOR .onion domain - setting False ...", domain) + wanted = False + elif domain.endswith(".tld"): + logger.debug("domain='%s' is a fake domain - setting False ...", domain) + wanted = False + elif blacklist.is_blacklisted(domain): + logger.debug("domain='%s' is blacklisted - setting False ...", domain) + wanted = False + elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0): + logger.debug("domain='%s' is a single user", domain) + wanted = False + elif domain.find("/tag/") > 0: + logger.debug("domain='%s' is a tag", domain) + wanted = False + + logger.debug("wanted='%s' - EXIT!", wanted) + return wanted diff --git a/fba/helpers/processing.py b/fba/helpers/processing.py index 5ffbbfc..87c313f 100644 --- a/fba/helpers/processing.py +++ b/fba/helpers/processing.py @@ -46,7 +46,7 @@ def domain(name: str, blocker: str, command: str) -> bool: logger.debug("Flushing updates for blocker='%s' ...", blocker) instances.update_data(blocker) - if not utils.is_domain_wanted(name): + if not domain_helper.is_wanted(name): logger.debug("name='%s' is not wanted - SKIPPED!", name) return False elif instances.is_recent(name): diff --git a/fba/helpers/software.py b/fba/helpers/software.py index 76286a1..a93b086 100644 --- a/fba/helpers/software.py +++ b/fba/helpers/software.py @@ -36,7 +36,7 @@ def alias(software: str) -> str: software = "pleroma" elif "radiant" in software: logger.debug("Setting radiant: software='%s'", software) - sofware = "radiant" + software = "radiant" elif software in ["hometown", "ecko", "fedibird" ] or "되는 마스토돈" in software or "mastodon" in software: logger.debug("Setting mastodon: software='%s'", software) software = "mastodon" diff --git a/fba/helpers/version.py b/fba/helpers/version.py index 2ea6219..bb8dc9a 100644 --- a/fba/helpers/version.py +++ b/fba/helpers/version.py @@ -114,7 +114,7 @@ def strip_hosted_on(software: str) -> str: elif software == "": raise ValueError("Parameter 'software' is empty") elif "hosted on" not in software: - logger.warning("Cannot find 'hosted on' in sofware='%s'!", software) + logger.warning("Cannot find 'hosted on' in software='%s'!", software) return software end = software.find("hosted on ") diff --git a/fba/http/federation.py b/fba/http/federation.py index 5e490fc..0395eb9 100644 --- a/fba/http/federation.py +++ b/fba/http/federation.py @@ -22,7 +22,6 @@ import requests import validators from fba import csrf -from fba import utils from fba.helpers import config from fba.helpers import cookies @@ -126,7 +125,7 @@ def fetch_instances(domain: str, origin: str, software: str, command: str, path: instance = instance.encode("idna").decode("utf-8") logger.debug("instance='%s' - AFTER!", instance) - if not utils.is_domain_wanted(instance): + if not domain_helper.is_wanted(instance): logger.debug("instance='%s' is not wanted - SKIPPED!", instance) continue elif instance.find("/profile/") > 0 or instance.find("/users/") > 0 or (instances.is_registered(instance.split("/")[0]) and instance.find("/c/") > 0): @@ -417,7 +416,7 @@ def fetch_wellknown_nodeinfo(domain: str) -> dict: components = urlparse(url) logger.debug("components.netloc[]='%s'", type(components.netloc)) - if not utils.is_domain_wanted(components.netloc): + if not domain_helper.is_wanted(components.netloc): logger.debug("components.netloc='%s' is not wanted - SKIPPED!", components.netloc) continue @@ -662,7 +661,7 @@ def find_domains(tag: bs4.element.Tag) -> list: logger.debug("domain='%s',reason='%s'", domain, reason) - if not utils.is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is blacklisted - SKIPPED!", domain) continue elif domain == "gab.com/.ai, develop.gab.com": @@ -721,7 +720,7 @@ def add_peers(rows: dict) -> list: raise ValueError(f"peer[]='{type(peer)}' is not supported,key='{key}'") logger.debug("peer[%s]='%s' - AFTER!", type(peer), peer) - if not utils.is_domain_wanted(peer): + if not domain_helper.is_wanted(peer): logger.debug("peer='%s' is not wanted - SKIPPED!", peer) continue diff --git a/fba/models/instances.py b/fba/models/instances.py index f27d90e..69b6f15 100644 --- a/fba/models/instances.py +++ b/fba/models/instances.py @@ -421,7 +421,7 @@ def set_obfuscated_blocks(domain: str, obfuscated: int): domain_helper.raise_on(domain) if not isinstance(obfuscated, int): - raise ValueError(f"Parameter obfuscated[]='{type(blocks)}' is not of type 'int'") + raise ValueError(f"Parameter obfuscated[]='{type(obfuscated)}' is not of type 'int'") elif obfuscated < 0: raise ValueError(f"Parameter obfuscated={obfuscated} is not valid") diff --git a/fba/networks/friendica.py b/fba/networks/friendica.py index 96c2508..b1d34fc 100644 --- a/fba/networks/friendica.py +++ b/fba/networks/friendica.py @@ -84,7 +84,7 @@ def fetch_blocks(domain: str) -> list: if blocked == "": logger.debug("line[]='%s' returned empty blocked domain - SKIPPED!", type(line)) continue - elif not utils.is_domain_wanted(blocked): + elif not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", domain) continue @@ -92,7 +92,7 @@ def fetch_blocks(domain: str) -> list: blocked = utils.deobfuscate(blocked, domain) logger.debug("blocked[%s]='%s' - DEOBFUSCATED!", type(blocked), blocked) - if not utils.is_domain_wanted(blocked): + if not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue diff --git a/fba/networks/lemmy.py b/fba/networks/lemmy.py index 42c4eb4..7b427fc 100644 --- a/fba/networks/lemmy.py +++ b/fba/networks/lemmy.py @@ -20,7 +20,6 @@ import logging import bs4 from fba import csrf -from fba import utils from fba.helpers import config from fba.helpers import domain as domain_helper @@ -154,7 +153,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: logger.debug("Checking %d containers ...", len(containers)) for container in containers: logger.debug("container[]='%s'", type(container)) - for header in container.find_all(["h2", "h3", "h4", "h5"]): + for header in container.find_all(["h2", "h3", "h4", "h5"]): content = header logger.debug("header[%s]='%s' - BEFORE!", type(header), header) if header is not None: @@ -210,7 +209,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: if blocked == "": logger.warning("blocked='%s' is empty after tidyup.domain() - SKIPPED!", tag.contents[0]) continue - elif not utils.is_domain_wanted(blocked): + elif not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -271,7 +270,7 @@ def fetch_instances(domain: str, origin: str) -> list: if peer == "": logger.debug("peer is empty - SKIPPED!") continue - elif not utils.is_domain_wanted(peer): + elif not domain_helper.is_wanted(peer): logger.debug("peer='%s' is not wanted - SKIPPED!", peer) continue elif peer in peers: @@ -367,7 +366,7 @@ def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list: if peer == "": logger.debug("peer is empty - SKIPPED!") continue - elif not utils.is_domain_wanted(peer): + elif not domain_helper.is_wanted(peer): logger.debug("peer='%s' is not wanted - SKIPPED!", peer) continue elif peer in peers: diff --git a/fba/networks/misskey.py b/fba/networks/misskey.py index c254111..4227b2c 100644 --- a/fba/networks/misskey.py +++ b/fba/networks/misskey.py @@ -18,7 +18,6 @@ import json import logging from fba import csrf -from fba import utils from fba.helpers import config from fba.helpers import dicts as dict_helper @@ -107,7 +106,7 @@ def fetch_peers(domain: str) -> list: elif not isinstance(row["host"], str): logger.warning("row[host][]='%s' is not of type 'str' - SKIPPED!", type(row['host'])) continue - elif not utils.is_domain_wanted(row["host"]): + elif not domain_helper.is_wanted(row["host"]): logger.debug("row[host]='%s' is not wanted, domain='%s' - SKIPPED!", row['host'], domain) continue elif row["host"] in peers: diff --git a/fba/networks/pleroma.py b/fba/networks/pleroma.py index fbe03da..320e3f5 100644 --- a/fba/networks/pleroma.py +++ b/fba/networks/pleroma.py @@ -130,7 +130,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: if blocked == "": logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s' - SKIPPED!", domain, block_level) continue - elif not utils.is_domain_wanted(blocked): + elif not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -138,7 +138,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: blocked = utils.deobfuscate(blocked, domain) logger.debug("blocked='%s' - DEOBFUSCATED!", blocked) - if not utils.is_domain_wanted(blocked): + if not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -163,7 +163,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: if blocked == "": logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level) continue - elif not utils.is_domain_wanted(blocked): + elif not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -171,7 +171,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: blocked = utils.deobfuscate(blocked, domain) logger.debug("blocked='%s' - DEOBFUSCATED!", blocked) - if not utils.is_domain_wanted(blocked): + if not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -232,7 +232,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: if blocked == "": logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level) continue - elif not utils.is_domain_wanted(blocked): + elif not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -266,7 +266,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: elif blocked == "": logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level) continue - elif not utils.is_domain_wanted(blocked): + elif not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -274,7 +274,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: blocked = utils.deobfuscate(blocked, domain) logger.debug("blocked='%s' - DEOBFUSCATED!", blocked) - if not utils.is_domain_wanted(blocked): + if not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -304,7 +304,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: block["blocked"] = utils.deobfuscate(block["blocked"], domain) logger.debug("block[blocked]='%s' - DEOBFUSCATED!", block["blocked"]) - if not utils.is_domain_wanted(block["blocked"]): + if not domain_helper.is_wanted(block["blocked"]): logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"]) continue diff --git a/fba/utils.py b/fba/utils.py index ee2376b..44c8310 100644 --- a/fba/utils.py +++ b/fba/utils.py @@ -20,9 +20,7 @@ from urllib.parse import urlparse import bs4 import requests -import validators -from fba.helpers import blacklist from fba.helpers import config from fba.helpers import domain as domain_helper from fba.helpers import tidyup @@ -99,7 +97,7 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list: domain = domain.encode("idna").decode("utf-8") logger.debug("domain='%s' - AFTER!", domain) - if not is_domain_wanted(domain): + if not domain_helper.is_wanted(domain): logger.debug("domain='%s' is not wanted - SKIPPED!", domain) continue @@ -109,41 +107,6 @@ def find_domains(tags: bs4.element.ResultSet, search: str) -> list: logger.debug("domains()=%d - EXIT!", len(domains)) return domains -def is_domain_wanted(domain: str) -> bool: - logger.debug("domain='%s' - CALLED!", domain) - - wanted = True - if not isinstance(domain, str): - raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'") - elif domain == "": - raise ValueError("Parameter 'domain' is empty") - elif domain.lower() != domain: - wanted = False - elif not validators.domain(domain.split("/")[0]): - logger.debug("domain='%s' is not a valid domain name - setting False ...", domain) - wanted = False - elif domain.endswith(".arpa"): - logger.debug("domain='%s' is a domain for reversed IP addresses - setting False ...", domain) - wanted = False - elif domain.endswith(".onion"): - logger.debug("domain='%s' is a TOR .onion domain - setting False ...", domain) - wanted = False - elif domain.endswith(".tld"): - logger.debug("domain='%s' is a fake domain - setting False ...", domain) - wanted = False - elif blacklist.is_blacklisted(domain): - logger.debug("domain='%s' is blacklisted - setting False ...", domain) - wanted = False - elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0): - logger.debug("domain='%s' is a single user", domain) - wanted = False - elif domain.find("/tag/") > 0: - logger.debug("domain='%s' is a tag", domain) - wanted = False - - logger.debug("wanted='%s' - EXIT!", wanted) - return wanted - def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str: logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash) domain_helper.raise_on(blocker)