from datetime import datetime
from email.utils import format_datetime
from pathlib import Path
-from urllib.parse import urlparse
import fastapi
from fastapi import Request, HTTPException, Query
from fba.helpers import blacklist
from fba.helpers import config
+from fba.helpers import domain as domain_helper
from fba.helpers import json as json_helper
from fba.helpers import tidyup
)
elif mode in ["domain", "reverse"]:
domain = tidyup.domain(value)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
# Tidy up domain name
domain = tidyup.domain(domain).encode("idna").decode("utf-8")
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
# Fetch domain data
def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
if mode == "block_level" and not blocks.valid(value, "block_level"):
raise HTTPException(status_code=500, detail="Invalid block level provided")
- elif mode in ["domain", "reverse"] and not utils.is_domain_wanted(value):
+ elif mode in ["domain", "reverse"] and not domain_helper.is_wanted(value):
raise HTTPException(status_code=500, detail="Invalid or blocked domain specified")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}")
# Tidy up domain name
domain = tidyup.domain(domain).encode("idna").decode("utf-8")
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
raise HTTPException(status_code=500, detail=f"domain='{domain}' is not wanted")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/domain.json?domain={domain}")
from fba.helpers import config
from fba.helpers import cookies
from fba.helpers import dicts as dict_helper
+from fba.helpers import domain as domain_helper
from fba.helpers import locking
from fba.helpers import processing
from fba.helpers import software as software_helper
domain = row["domain"].encode("idna").decode("utf-8")
logger.debug("domain='%s' - AFTER!", domain)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif instances.is_registered(domain):
elif entry["domain"] == "":
logger.debug("entry[domain] is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(entry["domain"]):
+ elif not domain_helper.is_wanted(entry["domain"]):
logger.debug("entry[domain]='%s' is not wanted - SKIPPED!", entry["domain"])
continue
elif instances.is_registered(entry["domain"]):
elif nodeinfo_url is None or nodeinfo_url == "":
logger.debug("blocker='%s',software='%s' has empty nodeinfo_url", blocker, software)
continue
- elif not utils.is_domain_wanted(blocker):
+ elif not domain_helper.is_wanted(blocker):
logger.debug("blocker='%s' is not wanted - SKIPPED!", blocker)
continue
if software == "pleroma":
logger.info("blocker='%s',software='%s'", blocker, software)
blocking = pleroma.fetch_blocks(blocker, nodeinfo_url)
+ logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
elif software == "mastodon":
logger.info("blocker='%s',software='%s'", blocker, software)
blocking = mastodon.fetch_blocks(blocker, nodeinfo_url)
+ logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
elif software == "lemmy":
logger.info("blocker='%s',software='%s'", blocker, software)
blocking = lemmy.fetch_blocks(blocker, nodeinfo_url)
+ logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
elif software == "friendica":
logger.info("blocker='%s',software='%s'", blocker, software)
blocking = friendica.fetch_blocks(blocker)
+ logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
elif software == "misskey":
logger.info("blocker='%s',software='%s'", blocker, software)
blocking = misskey.fetch_blocks(blocker)
+ logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
else:
logger.warning("Unknown software: blocker='%s',software='%s'", blocker, software)
block["blocked"] = block["blocked"].lstrip(".").encode("idna").decode("utf-8")
logger.debug("block[blocked]='%s' - AFTER!", block["blocked"])
- if not utils.is_domain_wanted(block["blocked"]):
+ if not domain_helper.is_wanted(block["blocked"]):
logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"])
continue
elif block["block_level"] in ["accept", "accepted"]:
domain = domain.encode("idna").decode("utf-8")
logger.debug("domain='%s' - AFTER!", domain)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif instances.is_registered(domain):
domain = domain.encode("idna").decode("utf-8")
logger.debug("domain='%s' - AFTER!", domain)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif domain in domains:
domain = domain.encode("idna").decode("utf-8")
logger.debug("domain='%s' - AFTER!", domain)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif domain in domains:
domain = row["domain"].encode("idna").decode("utf-8")
logger.debug("domain='%s' - AFTER!", domain)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("Domain domain='%s' is not wanted - SKIPPED!", domain)
continue
if domain == "":
logger.debug("domain is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(domain):
+ elif not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif instances.is_recent(domain):
domain = domain.encode("idna").decode("utf-8")
logger.debug("domain='%s' - AFTER!", domain)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif instances.is_registered(domain):
if "host" not in row:
logger.warning("row='%s' does not contain key 'host' - SKIPPED!", row)
continue
- elif not utils.is_domain_wanted(row["host"]):
+ elif not domain_helper.is_wanted(row["host"]):
logger.debug("row[host]='%s' is not wanted - SKIPPED!", row["host"])
continue
elif instances.is_registered(row["host"]):
if "url" not in row:
logger.warning("row()=%d does not have element 'url' - SKIPPED!", len(row))
continue
- elif not utils.is_domain_wanted(row["url"]):
+ elif not domain_helper.is_wanted(row["url"]):
logger.debug("row[url]='%s' is not wanted - SKIPPED!", row["url"])
continue
elif instances.is_registered(row["url"]):
if block["blocked"] == "":
logger.debug("block[blocked] is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(block["blocked"]):
+ elif not domain_helper.is_wanted(block["blocked"]):
logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"])
continue
elif instances.is_recent(block["blocked"]):
if block["blocked"] == "":
logger.debug("block[blocked] is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(block["blocked"]):
+ elif not domain_helper.is_wanted(block["blocked"]):
logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"])
continue
logger.debug("Invoking locking.acquire() ...")
locking.acquire()
- if isinstance(args.domain, str) and args.domain != "" and utils.is_domain_wanted(args.domain):
+ if isinstance(args.domain, str) and args.domain != "" and domain_helper.is_wanted(args.domain):
database.cursor.execute("SELECT domain, software, nodeinfo_url FROM instances WHERE has_obfuscation = 1 AND domain = ?", [args.domain])
elif isinstance(args.software, str) and args.software != "" and validators.domain(args.software) == args.software:
database.cursor.execute("SELECT domain, software, nodeinfo_url FROM instances WHERE has_obfuscation = 1 AND software = ?", [args.software])
logger.debug("domain='%s',software='%s'", row["domain"], row["software"])
blocking = misskey.fetch_blocks(row["domain"])
else:
- logger.warning("Unknown sofware: domain='%s',software='%s'", row["domain"], row["software"])
+ logger.warning("Unknown software: domain='%s',software='%s'", row["domain"], row["software"])
logger.debug("row[domain]='%s'", row["domain"])
# chaos.social requires special care ...
logger.debug("block='%s' is obfuscated.", block["blocked"])
obfuscated = obfuscated + 1
blocked = utils.deobfuscate(block["blocked"], row["domain"], block["hash"] if "hash" in block else None)
- elif not utils.is_domain_wanted(block["blocked"]):
+ elif not domain_helper.is_wanted(block["blocked"]):
logger.debug("blocked='%s' is not wanted - SKIPPED!", block["blocked"])
continue
elif blocks.is_instance_blocked(row["domain"], block["blocked"]):
domain = domain.encode("idna").decode("utf-8")
logger.debug("domain='%s' - AFTER!", domain)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif (args.force is None or not args.force) and instances.is_registered(domain):
domain = domain.encode("idna").decode("utf-8")
logger.debug("domain='%s' - AFTER!", domain)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif domain in domains:
continue
domain = str(domain)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
components = urlparse(link["href"])
domain = components.netloc.lower()
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
import validators
+from fba.helpers import blacklist
+
+from fba.models import instances
+
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.debug("is_found='%s' - EXIT!", is_found)
return is_found
+
+def is_wanted(domain: str) -> bool:
+ logger.debug("domain='%s' - CALLED!", domain)
+
+ wanted = True
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif domain.lower() != domain:
+ wanted = False
+ elif not validators.domain(domain.split("/")[0]):
+ logger.debug("domain='%s' is not a valid domain name - setting False ...", domain)
+ wanted = False
+ elif domain.endswith(".arpa"):
+ logger.debug("domain='%s' is a domain for reversed IP addresses - setting False ...", domain)
+ wanted = False
+ elif domain.endswith(".onion"):
+ logger.debug("domain='%s' is a TOR .onion domain - setting False ...", domain)
+ wanted = False
+ elif domain.endswith(".tld"):
+ logger.debug("domain='%s' is a fake domain - setting False ...", domain)
+ wanted = False
+ elif blacklist.is_blacklisted(domain):
+ logger.debug("domain='%s' is blacklisted - setting False ...", domain)
+ wanted = False
+ elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
+ logger.debug("domain='%s' is a single user", domain)
+ wanted = False
+ elif domain.find("/tag/") > 0:
+ logger.debug("domain='%s' is a tag", domain)
+ wanted = False
+
+ logger.debug("wanted='%s' - EXIT!", wanted)
+ return wanted
logger.debug("Flushing updates for blocker='%s' ...", blocker)
instances.update_data(blocker)
- if not utils.is_domain_wanted(name):
+ if not domain_helper.is_wanted(name):
logger.debug("name='%s' is not wanted - SKIPPED!", name)
return False
elif instances.is_recent(name):
software = "pleroma"
elif "radiant" in software:
logger.debug("Setting radiant: software='%s'", software)
- sofware = "radiant"
+ software = "radiant"
elif software in ["hometown", "ecko", "fedibird" ] or "되는 마스토돈" in software or "mastodon" in software:
logger.debug("Setting mastodon: software='%s'", software)
software = "mastodon"
elif software == "":
raise ValueError("Parameter 'software' is empty")
elif "hosted on" not in software:
- logger.warning("Cannot find 'hosted on' in sofware='%s'!", software)
+ logger.warning("Cannot find 'hosted on' in software='%s'!", software)
return software
end = software.find("hosted on ")
import validators
from fba import csrf
-from fba import utils
from fba.helpers import config
from fba.helpers import cookies
instance = instance.encode("idna").decode("utf-8")
logger.debug("instance='%s' - AFTER!", instance)
- if not utils.is_domain_wanted(instance):
+ if not domain_helper.is_wanted(instance):
logger.debug("instance='%s' is not wanted - SKIPPED!", instance)
continue
elif instance.find("/profile/") > 0 or instance.find("/users/") > 0 or (instances.is_registered(instance.split("/")[0]) and instance.find("/c/") > 0):
components = urlparse(url)
logger.debug("components.netloc[]='%s'", type(components.netloc))
- if not utils.is_domain_wanted(components.netloc):
+ if not domain_helper.is_wanted(components.netloc):
logger.debug("components.netloc='%s' is not wanted - SKIPPED!", components.netloc)
continue
logger.debug("domain='%s',reason='%s'", domain, reason)
- if not utils.is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
continue
elif domain == "gab.com/.ai, develop.gab.com":
raise ValueError(f"peer[]='{type(peer)}' is not supported,key='{key}'")
logger.debug("peer[%s]='%s' - AFTER!", type(peer), peer)
- if not utils.is_domain_wanted(peer):
+ if not domain_helper.is_wanted(peer):
logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
continue
domain_helper.raise_on(domain)
if not isinstance(obfuscated, int):
- raise ValueError(f"Parameter obfuscated[]='{type(blocks)}' is not of type 'int'")
+ raise ValueError(f"Parameter obfuscated[]='{type(obfuscated)}' is not of type 'int'")
elif obfuscated < 0:
raise ValueError(f"Parameter obfuscated={obfuscated} is not valid")
if blocked == "":
logger.debug("line[]='%s' returned empty blocked domain - SKIPPED!", type(line))
continue
- elif not utils.is_domain_wanted(blocked):
+ elif not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", domain)
continue
blocked = utils.deobfuscate(blocked, domain)
logger.debug("blocked[%s]='%s' - DEOBFUSCATED!", type(blocked), blocked)
- if not utils.is_domain_wanted(blocked):
+ if not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
import bs4
from fba import csrf
-from fba import utils
from fba.helpers import config
from fba.helpers import domain as domain_helper
logger.debug("Checking %d containers ...", len(containers))
for container in containers:
logger.debug("container[]='%s'", type(container))
- for header in container.find_all(["h2", "h3", "h4", "h5"]):
+ for header in container.find_all(["h2", "h3", "h4", "h5"]):
content = header
logger.debug("header[%s]='%s' - BEFORE!", type(header), header)
if header is not None:
if blocked == "":
logger.warning("blocked='%s' is empty after tidyup.domain() - SKIPPED!", tag.contents[0])
continue
- elif not utils.is_domain_wanted(blocked):
+ elif not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
if peer == "":
logger.debug("peer is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(peer):
+ elif not domain_helper.is_wanted(peer):
logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
continue
elif peer in peers:
if peer == "":
logger.debug("peer is empty - SKIPPED!")
continue
- elif not utils.is_domain_wanted(peer):
+ elif not domain_helper.is_wanted(peer):
logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
continue
elif peer in peers:
import logging
from fba import csrf
-from fba import utils
from fba.helpers import config
from fba.helpers import dicts as dict_helper
elif not isinstance(row["host"], str):
logger.warning("row[host][]='%s' is not of type 'str' - SKIPPED!", type(row['host']))
continue
- elif not utils.is_domain_wanted(row["host"]):
+ elif not domain_helper.is_wanted(row["host"]):
logger.debug("row[host]='%s' is not wanted, domain='%s' - SKIPPED!", row['host'], domain)
continue
elif row["host"] in peers:
if blocked == "":
logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s' - SKIPPED!", domain, block_level)
continue
- elif not utils.is_domain_wanted(blocked):
+ elif not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
blocked = utils.deobfuscate(blocked, domain)
logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
- if not utils.is_domain_wanted(blocked):
+ if not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
if blocked == "":
logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
continue
- elif not utils.is_domain_wanted(blocked):
+ elif not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
blocked = utils.deobfuscate(blocked, domain)
logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
- if not utils.is_domain_wanted(blocked):
+ if not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
if blocked == "":
logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
continue
- elif not utils.is_domain_wanted(blocked):
+ elif not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif blocked == "":
logger.warning("blocked is empty after tidyup.domain(): domain='%s',block_level='%s'", domain, block_level)
continue
- elif not utils.is_domain_wanted(blocked):
+ elif not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
blocked = utils.deobfuscate(blocked, domain)
logger.debug("blocked='%s' - DEOBFUSCATED!", blocked)
- if not utils.is_domain_wanted(blocked):
+ if not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
block["blocked"] = utils.deobfuscate(block["blocked"], domain)
logger.debug("block[blocked]='%s' - DEOBFUSCATED!", block["blocked"])
- if not utils.is_domain_wanted(block["blocked"]):
+ if not domain_helper.is_wanted(block["blocked"]):
logger.debug("block[blocked]='%s' is not wanted - SKIPPED!", block["blocked"])
continue
import bs4
import requests
-import validators
-from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import domain as domain_helper
from fba.helpers import tidyup
domain = domain.encode("idna").decode("utf-8")
logger.debug("domain='%s' - AFTER!", domain)
- if not is_domain_wanted(domain):
+ if not domain_helper.is_wanted(domain):
logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
logger.debug("domains()=%d - EXIT!", len(domains))
return domains
-def is_domain_wanted(domain: str) -> bool:
- logger.debug("domain='%s' - CALLED!", domain)
-
- wanted = True
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif domain.lower() != domain:
- wanted = False
- elif not validators.domain(domain.split("/")[0]):
- logger.debug("domain='%s' is not a valid domain name - setting False ...", domain)
- wanted = False
- elif domain.endswith(".arpa"):
- logger.debug("domain='%s' is a domain for reversed IP addresses - setting False ...", domain)
- wanted = False
- elif domain.endswith(".onion"):
- logger.debug("domain='%s' is a TOR .onion domain - setting False ...", domain)
- wanted = False
- elif domain.endswith(".tld"):
- logger.debug("domain='%s' is a fake domain - setting False ...", domain)
- wanted = False
- elif blacklist.is_blacklisted(domain):
- logger.debug("domain='%s' is blacklisted - setting False ...", domain)
- wanted = False
- elif domain.find("/profile/") > 0 or domain.find("/users/") > 0 or (instances.is_registered(domain.split("/")[0]) and domain.find("/c/") > 0):
- logger.debug("domain='%s' is a single user", domain)
- wanted = False
- elif domain.find("/tag/") > 0:
- logger.debug("domain='%s' is a tag", domain)
- wanted = False
-
- logger.debug("wanted='%s' - EXIT!", wanted)
- return wanted
-
def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
logger.debug("domain='%s',blocker='%s',domain_hash='%s' - CALLED!", domain, blocker, domain_hash)
domain_helper.raise_on(blocker)