import requests
import validators
-from fba import fba
+from fba import database
+from fba import utils
from fba.helpers import config
from fba.helpers import tidyup
@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
def api_info():
- fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL)")
- row = fba.cursor.fetchone()
+ database.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_error_details IS NOT NULL)")
+ row = database.cursor.fetchone()
return {
"known_instances" : row[0],
raise HTTPException(status_code=400, detail="Too many results")
if mode == "blocked":
- fba.cursor.execute("SELECT blocked, COUNT(blocked) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY score DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT blocked, COUNT(blocked) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY score DESC LIMIT ?", [amount])
elif mode == "blocker":
- fba.cursor.execute("SELECT blocker, COUNT(blocker) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY score DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT blocker, COUNT(blocker) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY score DESC LIMIT ?", [amount])
elif mode == "reference":
- fba.cursor.execute("SELECT origin, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY score DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT origin, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY score DESC LIMIT ?", [amount])
elif mode == "software":
- fba.cursor.execute("SELECT software, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY score DESC, software ASC LIMIT ?", [amount])
+ database.cursor.execute("SELECT software, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY score DESC, software ASC LIMIT ?", [amount])
elif mode == "command":
- fba.cursor.execute("SELECT command, COUNT(domain) AS score FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY score DESC, command ASC LIMIT ?", [amount])
+ database.cursor.execute("SELECT command, COUNT(domain) AS score FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY score DESC, command ASC LIMIT ?", [amount])
elif mode == "error_code":
- fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS score FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY score DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT last_status_code, COUNT(domain) AS score FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY score DESC LIMIT ?", [amount])
elif mode == "avg_peers":
- fba.cursor.execute("SELECT software, AVG(total_peers) AS sum FROM instances WHERE software IS NOT NULL GROUP BY software HAVING sum>0 ORDER BY sum DESC LIMIT ?", [amount])
+ database.cursor.execute("SELECT software, AVG(total_peers) AS sum FROM instances WHERE software IS NOT NULL GROUP BY software HAVING sum>0 ORDER BY sum DESC LIMIT ?", [amount])
else:
raise HTTPException(status_code=400, detail="No filter specified")
scores = list()
- for domain, score in fba.cursor.fetchall():
+ for domain, score in database.cursor.fetchall():
scores.append({
"domain": domain,
"score" : round(score)
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode('idna').decode('utf-8')
- fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
- (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
+ (domain, "*." + domain, wildchar, utils.get_hash(domain), punycode, "*." + punycode))
elif reverse is not None:
reverse = tidyup.domain(reverse)
if not validators.domain(reverse):
raise HTTPException(status_code=500, detail="Invalid domain")
- fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
else:
- fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
- blocklist = fba.cursor.fetchall()
+ blocklist = database.cursor.fetchall()
result = {}
for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
@router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
def api_mutual(domains: list[str] = Query()):
"""Return 200 if federation is open between the two, 4xx otherwise"""
- fba.cursor.execute(
+ database.cursor.execute(
"SELECT block_level FROM blocks " \
"WHERE ((blocker = :a OR blocker = :b) AND (blocked = :b OR blocked = :a OR blocked = :aw OR blocked = :bw)) " \
"AND block_level = 'reject' " \
"bw": "*." + domains[1],
},
)
- response = fba.cursor.fetchone()
+ response = database.cursor.fetchone()
if response is not None:
# Blocks found
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode("idna").decode("utf-8")
- fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT ?", [
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT ?", [
domain,
"*." + domain, wildchar,
- fba.get_hash(domain),
+ utils.get_hash(domain),
punycode,
"*." + punycode,
config.get("rss_limit")
])
else:
- fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT ?", [config.get("rss_limit")])
+ database.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks ORDER BY first_seen DESC LIMIT ?", [config.get("rss_limit")])
- result = fba.cursor.fetchall()
+ result = database.cursor.fetchall()
blocklist = []
for row in result:
'boot',
'commands',
'csrf',
- 'fba',
+ 'database',
+ 'utils',
# Sub packages:
'helpers',
'http',
import argparse
from fba import commands
-from fba import fba
+from fba import database
from fba.helpers import locking
def shutdown():
logger.debug("Closing database connection ...")
- fba.connection.close()
+ database.connection.close()
locking.release()
logger.debug("Shutdown completed.")
import reqto
import validators
-from fba import fba
+from fba import database
+from fba import utils
from fba.helpers import blacklist
from fba.helpers import config
if "domain" not in entry:
logger.warning("entry()=%d does not contain 'domain' - SKIPPED!", len(entry))
continue
- elif not validators.domain(entry["domain"]):
- logger.warning("domain='%s' is not a valid domain - SKIPPED!", entry['domain'])
- continue
- elif entry["domain"].endswith(".arpa"):
- logger.debug("entry[domain]='%s' is a domain for reversed IP addresses - SKIPPED!", entry["domain"])
- continue
- elif entry["domain"].endswith(".tld"):
- logger.debug("entry[domain]='%s' is a fake domain - SKIPPED!", entry['domain'])
- continue
- elif blacklist.is_blacklisted(entry["domain"]):
- logger.debug("domain='%s' is blacklisted - SKIPPED!", entry['domain'])
+ elif not utils.is_domain_wanted(entry["domain"]):
+ logger.debug("entry[domain]='%s' is not wanted - SKIPPED!")
continue
elif instances.is_registered(entry["domain"]):
logger.debug("domain='%s' is already registered - SKIPPED!", entry['domain'])
logger.info("Fetching instances from domain='%s' ...", domain)
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
- logger.debug(f"Invoking cookies.clear({domain}) ...")
+ logger.debug("Invoking cookies.clear(%s) ...", domain)
cookies.clear(domain)
except network.exceptions as exception:
logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_bkali) from domain='{domain}'")
logger.warning(f"domain='{args.domain}' is blacklisted, won't check it!")
return
elif not instances.is_registered(args.domain):
- logger.warning(f"domain='{args.domain}' is not registered, please run ./fba.py fetch_instances {args.domain} first.")
+ logger.warning(f"domain='{args.domain}' is not registered, please run ./utils.py fetch_instances {args.domain} first.")
return
locking.acquire()
if args.domain is not None and args.domain != "":
# Re-check single domain
logger.debug(f"Querying database for single args.domain='{args.domain}' ...")
- fba.cursor.execute(
+ database.cursor.execute(
"SELECT domain, software, origin, nodeinfo_url FROM instances WHERE domain = ?", [args.domain]
)
elif args.software is not None and args.software != "":
# Re-check single software
logger.debug(f"Querying database for args.software='{args.software}' ...")
- fba.cursor.execute(
+ database.cursor.execute(
"SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software = ?", [args.software]
)
else:
# Re-check after "timeout" (aka. minimum interval)
- fba.cursor.execute(
+ database.cursor.execute(
"SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'peertube') AND (last_blocked IS NULL OR last_blocked < ?) ORDER BY rowid DESC", [time.time() - config.get("recheck_block")]
)
- rows = fba.cursor.fetchall()
+ rows = database.cursor.fetchall()
logger.info("Checking %d entries ...", len(rows))
for blocker, software, origin, nodeinfo_url in rows:
logger.debug("BEFORE blocker='%s',software='%s',origin='%s',nodeinfo_url='%s'", blocker, software, origin, nodeinfo_url)
logger.debug(f"blocked='{blocked}',reason='{reason}' - BEFORE!")
blocked = tidyup.domain(blocked)
reason = tidyup.reason(reason) if reason is not None and reason != "" else None
- logger.debug(f"blocked='{blocked}',reason='{reason}' - AFTER!")
+ logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
if blocked == "":
logger.warning("blocked is empty, blocker='%s'", blocker)
nodeinfo_url = row[2]
logger.debug("Looking up instance by domainm, blocked='%s'", blocked)
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}',software='{software}' is not a valid domain name - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.debug("blocked='%s' is a domain for reversed IP addresses - SKIPPED!", blocked)
- continue
- elif blocked.endswith(".tld"):
- logger.debug(f"blocked='{blocked}' is a fake domain - SKIPPED!")
+ if not utils.is_domain_wanted(blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug("Hash wasn't found, adding: blocked='%s',blocker='%s'", blocked, blocker)
logger.debug(f"Invoking cookies.clear({blocked}) ...")
cookies.clear(blocked)
- logger.debug("Committing changes ...")
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
else:
logger.warning("Unknown software: blocker='%s',software='%s'", blocker, software)
try:
logger.debug(f"Fetching table data for software='{software}' ...")
- raw = fba.fetch_url(f"https://fediverse.observer/app/views/tabledata.php?software={software}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
+ raw = utils.fetch_url(f"https://fediverse.observer/app/views/tabledata.php?software={software}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
logger.debug(f"raw[{type(raw)}]()={len(raw)}")
doc = bs4.BeautifulSoup(raw, features='html.parser')
domain = item.decode_contents()
logger.debug("domain='%s'", domain)
- if not validators.domain(domain.split("/")[0]):
- logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
- continue
- elif domain.endswith(".arpa"):
- logger.debug("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
- continue
- elif domain.endswith(".tld"):
- logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
- continue
- elif blacklist.is_blacklisted(domain):
- logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
+ if not utils.is_domain_wanted(domain)
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif instances.is_registered(domain):
logger.debug("domain='%s' is already registered - SKIPPED!", domain)
continue
- logger.info(f"Fetching instances for domain='{domain}',software='{software}'")
+ logger.info("Fetching instances for domain='%s',software='%s'", domain, software)
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
- logger.debug(f"Invoking cookies.clear({domain}) ...")
+ logger.debug("Invoking cookies.clear(%s) ...", domain)
cookies.clear(domain)
logger.debug("EXIT!")
"reject": list(),
}
- raw = fba.fetch_url("https://wiki.todon.eu/todon/domainblocks", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
+ raw = utils.fetch_url("https://wiki.todon.eu/todon/domainblocks", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
logger.debug("raw()=%d,raw[]='%s'", len(raw), type(raw))
doc = bs4.BeautifulSoup(raw, "html.parser")
silenced = doc.find("h3", {"id": "silencedlimited_servers"}).find_next("ul").findAll("li")
logger.info("Checking %d silenced/limited entries ...", len(silenced))
- blocklist["silenced"] = fba.find_domains(silenced, "div")
+ blocklist["silenced"] = utils.find_domains(silenced, "div")
suspended = doc.find("h3", {"id": "suspended_servers"}).find_next("ul").findAll("li")
logger.info("Checking %d suspended entries ...", len(suspended))
- blocklist["reject"] = fba.find_domains(suspended, "div")
+ blocklist["reject"] = utils.find_domains(suspended, "div")
for block_level in blocklist:
blockers = blocklist[block_level]
if not instances.is_registered(blocked):
try:
- logger.info(f"Fetching instances from domain='{row['domain']}' ...")
+ logger.info("Fetching instances from domain='%s' ...", row['domain'])
federation.fetch_instances(blocked, 'chaos.social', None, inspect.currentframe().f_code.co_name)
- logger.debug(f"Invoking cookies.clear({row['domain']}) ...")
+ logger.debug("Invoking cookies.clear(%s) ...", row['domain'])
cookies.clear(blocked)
except network.exceptions as exception:
logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_cs) from domain='{row['domain']}'")
blocks.add_instance("todon.eu", blocked, None, block_level)
logger.debug("Invoking commit() ...")
- fba.connection.commit()
+ database.connection.commit()
logger.debug("EXIT!")
"reject" : list(),
}
- raw = fba.fetch_url("https://raw.githubusercontent.com/chaossocial/meta/master/federation.md", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
+ raw = utils.fetch_url("https://raw.githubusercontent.com/chaossocial/meta/master/federation.md", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
logger.debug("raw()=%d,raw[]='%s'", len(raw), type(raw))
doc = bs4.BeautifulSoup(markdown.markdown(raw, extensions=extensions), features='html.parser')
logger.debug(f"row='{row}'")
if not instances.is_registered(row["domain"]):
try:
- logger.info(f"Fetching instances from domain='{row['domain']}' ...")
+ logger.info("Fetching instances from domain='%s' ...", row['domain'])
federation.fetch_instances(row["domain"], 'chaos.social', None, inspect.currentframe().f_code.co_name)
- logger.debug(f"Invoking cookies.clear({row['domain']}) ...")
+ logger.debug("Invoking cookies.clear(%s) ...", row['domain'])
cookies.clear(row["domain"])
except network.exceptions as exception:
logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_cs) from domain='{row['domain']}'")
logger.debug(f"domain='{row['domain']}',block_level='{block_level}' blocked by chaos.social, adding ...")
blocks.add_instance('chaos.social', row["domain"], row["reason"], block_level)
- logger.debug("Committing changes ...")
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
logger.debug("EXIT!")
logger.debug("args[]='%s' - CALLED!", type(args))
domains = list()
- logger.info(f"Fetch FBA-specific RSS args.feed='{args.feed}' ...")
- response = fba.fetch_url(args.feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ logger.info("Fetch FBA-specific RSS args.feed='%s' ...", args.feed)
+ response = utils.fetch_url(args.feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
- logger.debug(f"response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
+ logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and len(response.text) > 0:
logger.debug(f"Parsing RSS feed ({len(response.text)} Bytes) ...")
rss = atoma.parse_rss_bytes(response.content)
if len(domains) > 0:
locking.acquire()
- logger.info(f"Adding {len(domains)} new instances ...")
+ logger.info("Adding %d new instances ...", len(domains))
for domain in domains:
try:
- logger.info(f"Fetching instances from domain='{domain}' ...")
+ logger.info("Fetching instances from domain='%s' ...", domain)
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
- logger.debug(f"Invoking cookies.clear({domain}) ...")
+ logger.debug("Invoking cookies.clear(%s) ...", domain)
cookies.clear(domain)
except network.exceptions as exception:
logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_fba_rss) from domain='{domain}'")
domains = list()
logger.info(f"Fetching ATOM feed='{feed}' from FBA bot account ...")
- response = fba.fetch_url(feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = utils.fetch_url(feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
- logger.debug(f"response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
+ logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and len(response.text) > 0:
logger.debug(f"Parsing ATOM feed ({len(response.text)} Bytes) ...")
atom = atoma.parse_atom_bytes(response.content)
logger.info(f"Fetching instances from domain='{domain}' ...")
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
- logger.debug(f"Invoking cookies.clear({domain}) ...")
+ logger.debug("Invoking cookies.clear(%s) ...", domain)
cookies.clear(domain)
except network.exceptions as exception:
logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_fbabot_atom) from domain='{domain}'")
return 0
# Loop through some instances
- fba.cursor.execute(
+ database.cursor.execute(
"SELECT domain, origin, software, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'friendica', 'misskey', 'lemmy', 'peertube') AND (last_instance_fetch IS NULL OR last_instance_fetch < ?) ORDER BY rowid DESC", [time.time() - config.get("recheck_instance")]
)
- rows = fba.cursor.fetchall()
+ rows = database.cursor.fetchall()
logger.info("Checking %d entries ...", len(rows))
for row in rows:
logger.debug(f"domain='{row[0]}'")
# Fetch this URL
logger.info(f"Fetching csv_url='{block['csv_url']}' for blocker='{block['blocker']}' ...")
- response = fba.fetch_url(f"{base_url}/{block['csv_url']}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = utils.fetch_url(f"{base_url}/{block['csv_url']}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.content != "":
logger.debug(f"row='{row}' does not contain domain column")
continue
- if not validators.domain(domain):
- logger.warning("domain='%s' is not a valid domain name - SKIPPED!", domain)
- continue
- elif domain.endswith(".arpa"):
- logger.debug("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
- continue
- elif domain.endswith(".tld"):
- logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
- continue
- elif blacklist.is_blacklisted(domain):
- logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
+ if not utils.is_domain_wanted(domain):
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
logger.debug(f"Marking domain='{domain}' as handled")
domains.append(domain)
logger.debug(f"Processing domain='{domain}' ...")
- processed = fba.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
+ processed = utils.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
logger.debug(f"processed='{processed}'")
logger.info(f"Checking {len(urls)} text file(s) ...")
for url in urls:
logger.debug("Fetching url='%s' ...", url)
- response = fba.fetch_url(url, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = utils.fetch_url(url, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and response.text != "":
if domain == "":
logger.debug("domain is empty - SKIPPED!")
continue
- elif not validators.domain(domain):
- logger.warning("domain='%s' is not a valid domain name - SKIPPED!", domain)
- continue
- elif domain.endswith(".arpa"):
- logger.debug("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
- continue
- elif domain.endswith(".tld"):
- logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
- continue
- elif blacklist.is_blacklisted(domain):
- logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
+ elif not utils.is_domain_wanted(domain):
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
logger.debug("domain='%s'", domain)
- processed = fba.process_domain(domain, 'seirdy.one', inspect.currentframe().f_code.co_name)
+ processed = utils.process_domain(domain, 'seirdy.one', inspect.currentframe().f_code.co_name)
logger.debug(f"processed='{processed}'")
if not processed:
logger.debug("args[]='%s' - CALLED!", type(args))
locking.acquire()
- response = fba.fetch_url("https://fedipact.online", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = utils.fetch_url("https://fedipact.online", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and response.text != "":
if domain == "":
logger.debug("domain is empty - SKIPPED!")
continue
- elif not validators.domain(domain):
- logger.warning("domain='%s' is not a valid domain name - SKIPPED!", domain)
- continue
- elif domain.endswith(".arpa"):
- logger.debug("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
- continue
- elif domain.endswith(".tld"):
- logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
- continue
- elif blacklist.is_blacklisted(domain):
- logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
+ elif not utils.is_domain_wanted(domain):
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif instances.is_registered(domain):
logger.debug("domain='%s' is already registered - SKIPPED!", domain)
timeout=(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug(f"response.ok='{response.ok}',response.status_code={response.status_code},response.text()={len(response.text)}")
- if response.ok and response.status_code < 300 and response.text.find("<html") > 0:
+ logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
+ if response.ok and response.status_code < 300 and response.text != "" and response.text.find("<html") > 0:
# Save cookies
logger.debug(f"Parsing response.text()={len(response.text)} Bytes ...")
cookies.store(domain, response.cookies.get_dict())
--- /dev/null
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import logging
+import sqlite3
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+# Connect to database
+connection = sqlite3.connect("blocks.db")
+cursor = connection.cursor()
+++ /dev/null
-# Copyright (C) 2023 Free Software Foundation
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published
-# by the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see <https://www.gnu.org/licenses/>.
-
-import hashlib
-import logging
-import sqlite3
-
-from urllib.parse import urlparse
-
-import bs4
-import requests
-import validators
-
-from fba.helpers import blacklist
-from fba.helpers import cookies
-from fba.helpers import tidyup
-
-from fba.http import federation
-from fba.http import network
-
-from fba.models import instances
-
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
-# Connect to database
-connection = sqlite3.connect("blocks.db")
-cursor = connection.cursor()
-
-##### Other functions #####
-
-def is_primitive(var: any) -> bool:
- logger.debug(f"var[]='{type(var)}' - CALLED!")
- return type(var) in {int, str, float, bool} or var is None
-
-def get_hash(domain: str) -> str:
- logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif domain.lower() != domain:
- raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
- elif not validators.domain(domain.split("/")[0]):
- raise ValueError(f"domain='{domain}' is not a valid domain")
- elif domain.endswith(".arpa"):
- raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
- elif domain.endswith(".tld"):
- raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-
- return hashlib.sha256(domain.encode("utf-8")).hexdigest()
-
-def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
- logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
- if not isinstance(url, str):
- raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
- elif url == "":
- raise ValueError("Parameter 'url' is empty")
- elif not isinstance(headers, dict):
- raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
- elif not isinstance(timeout, tuple):
- raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
-
- logger.debug(f"Parsing url='{url}'")
- components = urlparse(url)
-
- # Invoke other function, avoid trailing ?
- logger.debug(f"components[{type(components)}]={components}")
- if components.query != "":
- response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
- else:
- response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
-
- logger.debug(f"response[]='{type(response)}' - EXXIT!")
- return response
-
-def process_domain(domain: str, blocker: str, command: str) -> bool:
- logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif domain.lower() != domain:
- raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
- elif not validators.domain(domain.split("/")[0]):
- raise ValueError(f"domain='{domain}' is not a valid domain")
- elif domain.endswith(".arpa"):
- raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
- elif domain.endswith(".tld"):
- raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
- elif not isinstance(blocker, str):
- raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
- elif blocker == "":
- raise ValueError("Parameter 'blocker' is empty")
- elif not validators.domain(blocker.split("/")[0]):
- raise ValueError(f"blocker='{blocker}' is not a valid domain")
- elif blocker.endswith(".arpa"):
- raise ValueError(f"blocker='{blocker}' is a domain for reversed IP addresses, please don't crawl them!")
- elif blocker.endswith(".tld"):
- raise ValueError(f"blocker='{blocker}' is a fake domain, please don't crawl them!")
- elif not isinstance(command, str):
- raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
- elif command == "":
- raise ValueError("Parameter 'command' is empty")
-
- if domain.find("*") > 0:
- # Try to de-obscure it
- row = instances.deobscure("*", domain)
-
- logger.debug(f"row[{type(row)}]='{row}'")
- if row is None:
- logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
- return False
-
- logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
- domain = row[0]
- elif domain.find("?") > 0:
- # Try to de-obscure it
- row = instances.deobscure("?", domain)
-
- logger.debug(f"row[{type(row)}]='{row}'")
- if row is None:
- logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
- return False
-
- logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
- domain = row[0]
-
- if not validators.domain(domain.split("/")[0]):
- logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
- return False
- elif domain.endswith(".arpa"):
- logger.warning(f"domain='{domain}' is a reversed .arpa domain and should not be used generally.")
- return False
- elif blacklist.is_blacklisted(domain):
- logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
- return False
- elif instances.is_recent(domain):
- logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
- return False
-
- processed = False
- try:
- logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
- federation.fetch_instances(domain, blocker, None, command)
- processed = True
-
- logger.debug(f"Invoking cookies.clear({domain}) ...")
- cookies.clear(domain)
- except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
- instances.set_last_error(domain, exception)
-
- logger.debug(f"processed='{processed}' - EXIT!")
- return processed
-
-def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
- logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
- if not isinstance(tags, bs4.element.ResultSet):
- raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
- elif not isinstance(search, str):
- raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
- elif search == "":
- raise ValueError("Parameter 'search' is empty")
-
- domains = list()
- for tag in tags:
- logger.debug("tag[]='%s'", type(tag))
- domain = tidyup.domain(tag.find(search).contents[0])
- logger.debug("domain='%s'", domain)
- if domain == "":
- logger.debug("tag='%s' has no domain, trying <em> ...", tag)
- domain = tidyup.domain(tag.find("em").contents[0])
-
- logger.debug("domain='%s'", domain)
- if not validators.domain(domain):
- logger.debug("domain='%s' is not a valid domain name - SKIPPED!", domain)
- continue
- elif domain.endswith(".arpa"):
- logger.debug("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
- continue
- elif domain.endswith(".tld"):
- logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
- continue
- elif blacklist.is_blacklisted(domain):
- logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
- continue
-
- logger.debug("Appending domain='%s'", domain)
- domains.append(domain)
-
- logger.debug("domains()=%d - EXIT!", len(domains))
- return domains
import validators
from fba import csrf
+from fba import utils
from fba.helpers import blacklist
from fba.helpers import config
if instance == "":
logger.warning(f"Empty instance after tidyup.domain(), domain='{domain}'")
continue
- elif not validators.domain(instance.split("/")[0]):
- logger.warning(f"Bad instance='{instance}' from domain='{domain}',origin='{origin}'")
- continue
- elif instance.endswith(".arpa"):
- logger.warning(f"instance='{instance}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blacklist.is_blacklisted(instance):
- logger.debug("instance is blacklisted:", instance)
+ elif not utils.is_domain_wanted((instance):
+ logger.debug("instance='%s' is not wanted - SKIPPED!", instance)
continue
elif instance.find("/profile/") > 0 or instance.find("/users/") > 0:
- logger.debug(f"instance='{instance}' is a link to a single user profile - SKIPPED!")
- continue
- elif instance.endswith(".tld"):
- logger.debug(f"instance='{instance}' is a fake domain - SKIPPED!")
+ logger.debug("instance='%s' is a link to a single user profile - SKIPPED!", instance)
continue
elif not instances.is_registered(instance):
logger.debug("Adding new instance:", instance, domain)
url = f"https://{domain}{url}"
components = urlparse(url)
- if not validators.domain(components.netloc):
- logger.warning(f"components.netloc='{components.netloc}' is not a valid domain - SKIPPED!")
- continue
- elif domain.endswith(".arpa"):
- logger.warning("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
- continue
- elif domain.endswith(".tld"):
- logger.warning("domain='%s' is a fake domain - SKIPPED!", domain)
- continue
- elif blacklist.is_blacklisted(components.netloc):
- logger.debug(f"components.netloc='{components.netloc}' is blacklisted - SKIPPED!")
+ if not utils.is_domain_wanted((components.netloc):
+ logger.debug("components.netloc='%s' is not wanted - SKIPPED!", components.netloc)
continue
logger.debug("Fetching nodeinfo from:", url)
logger.debug(f"Fetching path='{path}' from '{domain}' ...")
response = network.fetch_response(domain, path, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
- logger.debug("domain,response.ok,response.status_code,response.text[]:", domain, response.ok, response.status_code, type(response.text))
+ logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and response.text.find("<html") > 0:
logger.debug(f"Parsing response.text()={len(response.text)} Bytes ...")
+
doc = bs4.BeautifulSoup(response.text, "html.parser")
+ logger.debug("doc[]='%s'", type(doc))
- logger.debug("doc[]:", type(doc))
generator = doc.find("meta", {"name" : "generator"})
site_name = doc.find("meta", {"property": "og:site_name"})
- logger.debug(f"generator='{generator}',site_name='{site_name}'")
+ logger.debug("generator[]='%s',site_name[]='%s'", type(generator), type(site_name))
if isinstance(generator, bs4.element.Tag) and isinstance(generator.get("content"), str):
logger.debug("Found generator meta tag:", domain)
software = tidyup.domain(generator.get("content"))
+
logger.debug("software[%s]='%s'", type(software), software)
if software is not None and software != "":
logger.info("domain='%s' is generated by '%s'", domain, software)
elif isinstance(site_name, bs4.element.Tag) and isinstance(site_name.get("content"), str):
logger.debug("Found property=og:site_name:", domain)
software = tidyup.domain(site_name.get("content"))
+
logger.debug("software[%s]='%s'", type(software), software)
if software is not None and software != "":
logger.info("domain='%s' has og:site_name='%s'", domain, software)
logger.debug("software[]='%s'", type(software))
if isinstance(software, str) and software == "":
- logger.debug(f"Corrected empty string to None for software of domain='{domain}'")
+ logger.debug("Corrected empty string to None for software of domain='%s'", domain)
software = None
elif isinstance(software, str) and ("." in software or " " in software):
logger.debug(f"software='{software}' may contain a version number, domain='{domain}', removing it ...")
logger.debug("domain='%s',reason='%s'", domain, reason)
- if not validators.domain(domain.split("/")[0]):
- logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
- continue
- elif domain.endswith(".arpa"):
- logger.warning("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
- continue
- elif domain.endswith(".tld"):
- logger.warning("domain='%s' is a fake domain - SKIPPED!", domain)
- continue
- elif blacklist.is_blacklisted(domain):
+ if not utils.is_domain_wanted((domain):
logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
continue
elif domain == "gab.com/.ai, develop.gab.com":
raise ValueError(f"peer[]='{type(peer)}' is not supported,key='{key}'")
logger.debug(f"peer='{peer}' - AFTER!")
- if not validators.domain(peer):
- logger.warning(f"peer='{peer}' is not a valid domain - SKIPPED!")
- continue
- elif peer.endswith(".arpa"):
- logger.warning(f"peer='{peer}' is a domain for reversed IP addresses -SKIPPED!")
- continue
- elif peer.endswith(".tld"):
- logger.warning(f"peer='{peer}' is a fake domain - SKIPPED!")
- continue
- elif blacklist.is_blacklisted(peer):
- logger.debug(f"peer='{peer}' is blacklisted - SKIPPED!")
+ if not utils.is_domain_wanted((peer):
+ logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
continue
logger.debug(f"Adding peer='{peer}' ...")
import urllib3
import validators
-from fba import fba
+from fba import utils
from fba.helpers import config
from fba.helpers import cookies
try:
logger.debug("Fetching url='%s' ...", url)
- response = fba.fetch_url(url, api_headers, timeout)
+ response = utils.fetch_url(url, api_headers, timeout)
json_reply["json"] = json_from_response(response)
import time
import validators
-from fba import fba
+from fba import database
from fba.helpers import blacklist
from fba.helpers import tidyup
raise ValueError("Accepted domains are not wanted here")
logger.debug("Updating block reason:", reason, blocker, blocked, block_level)
- fba.cursor.execute(
+ database.cursor.execute(
"UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND (reason IS NULL OR reason = '') LIMIT 1",
[
reason,
elif block_level == "accept":
raise ValueError("Accepted domains are not wanted here")
- fba.cursor.execute(
+ database.cursor.execute(
"UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
[
time.time(),
elif block_level == "accept":
raise ValueError("Accepted domains are not wanted here")
- fba.cursor.execute(
+ database.cursor.execute(
"SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
(
blocker,
),
)
- is_blocked = fba.cursor.fetchone() is not None
+ is_blocked = database.cursor.fetchone() is not None
logger.debug(f"is_blocked='{is_blocked}' - EXIT!")
return is_blocked
logger.info("New block: blocker='%s',blocked='%s',reason='%s',block_level='%s'", blocker, blocked, reason, block_level)
- fba.cursor.execute(
+ database.cursor.execute(
"INSERT INTO blocks (blocker, blocked, reason, block_level, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
[
blocker,
import validators
-from fba import fba
+from fba import database
from fba.helpers import config
logger.debug("AFTER error[]:", type(error))
if isinstance(error, str):
- fba.cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
+ database.cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
domain,
error,
time.time()
])
else:
- fba.cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
+ database.cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
domain,
error["status_code"],
error["error_message"],
# Cleanup old entries
logger.debug(f"Purging old records (distance: {config.get('error_log_cleanup')})")
- fba.cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
+ database.cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
logger.debug("EXIT!")
import requests
import validators
-from fba import fba
+from fba import database
+from fba import utils
from fba.helpers import blacklist
from fba.helpers import cache
raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
elif not key in _pending:
raise ValueError(f"key='{key}' not found in _pending")
- elif not fba.is_primitive(value):
+ elif not utils.is_primitive(value):
raise ValueError(f"value[]='{type(value)}' is not a primitive type")
# Set it
logger.debug("sql_string:", sql_string)
logger.debug("Executing SQL:", sql_string)
- fba.cursor.execute(sql_string, fields)
+ database.cursor.execute(sql_string, fields)
- logger.debug(f"Success! (rowcount={fba.cursor.rowcount })")
- if fba.cursor.rowcount == 0:
+ logger.debug(f"Success! (rowcount={database.cursor.rowcount })")
+ if database.cursor.rowcount == 0:
raise Exception(f"Did not update any rows: domain='{domain}',fields()={len(fields)}")
- logger.debug("Committing changes ...")
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
logger.debug(f"Deleting _pending for domain='{domain}'")
for key in _pending:
return
logger.info("Adding instance domain='%s' (origin='%s',software='%s')", domain, origin, software)
- fba.cursor.execute(
+ database.cursor.execute(
"INSERT INTO instances (domain, origin, command, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
(
domain,
origin,
command,
- fba.get_hash(domain),
+ utils.get_hash(domain),
software,
time.time()
),
logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not cache.key_exists("is_registered"):
logger.debug("Cache for 'is_registered' not initialized, fetching all rows ...")
- fba.cursor.execute("SELECT domain FROM instances")
+ database.cursor.execute("SELECT domain FROM instances")
# Check Set all
- cache.set_all("is_registered", fba.cursor.fetchall(), True)
+ cache.set_all("is_registered", database.cursor.fetchall(), True)
# Is cache found?
registered = cache.sub_key_exists("is_registered", domain)
return False
# Query database
- fba.cursor.execute("SELECT last_instance_fetch FROM instances WHERE domain = ? LIMIT 1", [domain])
+ database.cursor.execute("SELECT last_instance_fetch FROM instances WHERE domain = ? LIMIT 1", [domain])
# Fetch row
- fetched = fba.cursor.fetchone()[0]
+ fetched = database.cursor.fetchone()[0]
logger.debug(f"fetched[{type(fetched)}]='{fetched}'")
recently = isinstance(fetched, float) and time.time() - fetched <= config.get("recheck_instance")
if isinstance(blocked_hash, str):
logger.debug(f"Looking up blocked_hash='{blocked_hash}' ...")
- fba.cursor.execute(
+ database.cursor.execute(
"SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? LIMIT 1", [blocked_hash]
)
- row = fba.cursor.fetchone()
+ row = database.cursor.fetchone()
logger.debug("row[]='%s'", type(row))
if row is None:
return deobscure(char, domain)
else:
logger.debug(f"Looking up domain='{domain}' ...")
- fba.cursor.execute(
+ database.cursor.execute(
"SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [domain.replace(char, "_")]
)
- row = fba.cursor.fetchone()
+ row = database.cursor.fetchone()
logger.debug("row[]='%s'", type(row))
logger.debug(f"row[]='{type(row)}' - EXIT!")
import bs4
import validators
+from fba import utils
+
from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import tidyup
reason = tidyup.reason(line.find_all("td")[1].text)
logger.debug(f"blocked='{blocked}',reason='{reason}'")
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}' is not a valid domain - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blocked.endswith(".tld"):
- logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
- continue
- elif blacklist.is_blacklisted(blocked):
- logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
+ if not utils.is_domain_wanted((blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
logger.debug(f"Appending blocked='{blocked}',reason='{reason}'")
import validators
from fba import csrf
-from fba import fba
+from fba import database
+from fba import utils
from fba.helpers import blacklist
from fba.helpers import config
(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug(f"response.ok='{response.ok}',response.status_code={response.status_code},response.text()={len(response.text)}")
+ logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and response.text != "":
logger.debug(f"Parsing {len(response.text)} Bytes ...")
blocked = tidyup.domain(tag.contents[0])
logger.debug(f"blocked='{blocked}'")
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}' is not a valid domain - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blocked.endswith(".tld"):
- logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
- continue
- elif blacklist.is_blacklisted(blocked):
- logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
+ if not utils.is_domain_wanted((blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug("Hash wasn't found, adding:", blocked, domain)
logger.debug(f"Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
blocks.update_last_seen(domain, blocked, "reject")
- logger.debug("Committing changes ...")
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
except network.exceptions as exception:
logger.warning(f"domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
instances.set_last_error(domain, exception)
import validators
from fba import csrf
-from fba import fba
+from fba import database
+from fba import utils
from fba.helpers import blacklist
from fba.helpers import config
logger.info("Checking %d entries from domain='%s' ...", len(blocklist), domain)
for block in blocklist:
# Check type
- logger.debug(f"block[]='{type(block)}'")
+ logger.debug("block[]='%s'", type(block))
if not isinstance(block, dict):
logger.debug(f"block[]='{type(block)}' is of type 'dict' - SKIPPED!")
continue
"reason": block["comment"] if "comment" in block else None
}
- logger.debug("severity,domain,hash,comment:", block['severity'], block['domain'], block['digest'], block['comment'])
+ logger.debug("severity='%s',domain='%s',hash='%s',comment='%s'", block['severity'], block['domain'], block['digest'], block['comment'])
if block['severity'] == 'suspend':
- logger.debug(f"Adding entry='{entry}' with severity='{block['severity']}' ...")
+ logger.debug("Adding entry='%s' with severity='%s' ...", entry, block['severity'])
rows['reject'].append(entry)
elif block['severity'] == 'silence':
- logger.debug(f"Adding entry='{entry}' with severity='{block['severity']}' ...")
+ logger.debug("Adding entry='%s' with severity='%s' ...", entry, block['severity'])
rows['followers_only'].append(entry)
elif block['severity'] == 'reject_media':
- logger.debug(f"Adding entry='{entry}' with severity='{block['severity']}' ...")
+ logger.debug("Adding entry='%s' with severity='%s' ...", entry, block['severity'])
rows['media_removal'].append(entry)
elif block['severity'] == 'reject_reports':
- logger.debug(f"Adding entry='{entry}' with severity='{block['severity']}' ...")
+ logger.debug("Adding entry='%s' with severity='%s' ...", entry, block['severity'])
rows['report_removal'].append(entry)
else:
- logger.warning(f"Unknown severity='{block['severity']}', domain='{block['domain']}'")
+ logger.warning("Unknown severity='%s', domain='%s'", block['severity'], block['domain'])
else:
- logger.debug(f"domain='{domain}' has returned zero rows, trying /about/more page ...")
+ logger.debug("domain='%s' has returned zero rows, trying /about/more page ...", domain)
rows = fetch_blocks_from_about(domain)
logger.info("Checking %d entries from domain='%s' ...", len(rows.items()), domain)
for block_level, blocklist in rows.items():
- logger.debug("domain,block_level,blocklist():", domain, block_level, len(blocklist))
+ logger.debug("domain='%s',block_level='%s',blocklist()=%d", domain, block_level, len(blocklist))
block_level = tidyup.domain(block_level)
- logger.debug("AFTER-block_level:", block_level)
+ logger.debug("block_level='%s' - AFTER!", block_level)
if block_level == "":
- logger.warning("block_level is empty, domain:", domain)
+ logger.warning("block_level is empty, domain='%s'", domain)
continue
elif block_level == "accept":
- logger.debug(f"domain='{domain}' skipping block_level='accept'")
+ logger.debug("domain='%s' skipping block_level='accept'", domain)
continue
logger.debug(f"Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
for block in blocklist:
- logger.debug(f"block[]='{type(block)}'")
+ logger.debug("block[]='%s'", type(block))
blocked, blocked_hash, reason = block.values()
logger.debug(f"blocked='{blocked}',blocked_hash='{blocked_hash}',reason='{reason}':")
blocked = tidyup.domain(blocked)
reason = tidyup.reason(reason) if reason is not None and reason != "" else None
- logger.debug(f"blocked='{blocked}',reason='{reason}' - AFTER!")
+ logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
if blocked == "":
logger.warning("blocked is empty, domain='%s'", domain)
logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
continue
- logger.debug("Updating domain: ", row[0])
+ logger.debug("Updating domain: row[0]='%s'", row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
continue
- logger.debug("Updating domain: ", row[0])
+ logger.debug("Updating domain: row[0]='%s'", row[0])
blocked = row[0]
origin = row[1]
nodeinfo_url = row[2]
logger.debug("Looking up instance by domain:", blocked)
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}' is not a valid domain name - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blocked.endswith(".tld"):
- logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
- continue
- elif blacklist.is_blacklisted(blocked):
- logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
+ if not utils.is_domain_wanted((blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
logger.debug("Looking up instance by domain:", blocked)
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}' is not a valid domain name - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blocked.endswith(".tld"):
- logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
- continue
- elif blacklist.is_blacklisted(blocked):
- logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
+ if not utils.is_domain_wanted((blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug("Hash wasn't found, adding:", blocked, domain)
blocks.update_last_seen(domain, blocked, block_level)
blocks.update_reason(reason, domain, blocked, block_level)
- logger.debug("Committing changes ...")
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
except network.exceptions as exception:
logger.warning(f"domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
instances.set_last_error(domain, exception)
import validators
from fba import csrf
+from fba import utils
-from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import dicts
from fba.helpers import tidyup
elif not isinstance(row["host"], str):
logger.warning(f"row[host][]='{type(row['host'])}' is not 'str' - SKIPPED!")
continue
- elif not validators.domain(row["host"].split("/")[0]):
- logger.warning(f"row[host]='{row['host']}' is not a valid domain - SKIPPED!")
- continue
- elif row["host"].endswith(".arpa"):
- logger.warning(f"row[host]='{row['host']}' is a domain for reversed IP addresses - SKIPPED!")
- continue
- elif row["host"].endswith(".tld"):
- logger.warning(f"row[host]='{row['host']}' is a fake domain - SKIPPED!")
- continue
- elif blacklist.is_blacklisted(row["host"]):
- logger.debug(f"row[host]='{row['host']}' is blacklisted. domain='{domain}' - SKIPPED!")
+ elif not utils.is_domain_wanted(row["host"]):
+ logger.debug(f"row[host]='{row['host']}' is not wanted, domain='{domain}' - SKIPPED!")
continue
elif row["host"] in peers:
logger.debug(f"Not adding row[host]='{row['host']}', already found.")
import bs4
import validators
-from fba import fba
+from fba import database
+from fba import utils
from fba.helpers import blacklist
from fba.helpers import config
logger.warning("block_level is now empty!")
continue
elif block_level == "accept":
- logger.debug(f"domain='{domain}' skipping block_level='accept'")
+ logger.debug("domain='%s' skipping block_level='accept'", domain)
continue
logger.debug(f"Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
nodeinfo_url = row[2]
logger.debug(f"blocked='{blocked}'")
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blocked.endswith(".tld"):
- logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
- continue
- elif blacklist.is_blacklisted(blocked):
- logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
+ if not utils.is_domain_wanted(blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
# Commit changes
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
nodeinfo_url = row[2]
logger.debug(f"blocked='{blocked}'")
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blocked.endswith(".tld"):
- logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
- continue
- elif blacklist.is_blacklisted(blocked):
- logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
+ if not utils.is_domain_wanted((blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
# Commit changes
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
else:
logger.warning(f"Cannot find 'mrf_simple' or 'quarantined_instances' in JSON reply: domain='{domain}'")
- logger.debug("Committing changes ...")
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
# Reasons
if "mrf_simple_info" in data:
logger.warning("block_level is now empty!")
continue
elif block_level == "accept":
- logger.debug(f"domain='{domain}' skipping block_level='accept'")
+ logger.debug("domain='%s' skipping block_level='accept'", domain)
continue
- logger.debug(f"Checking {len(info.items())} entries from domain='{domain}',software='pleroma',block_level='{block_level}' ...")
+ logger.debug(f"Checking {len(info.items())} entries from domain='{domain}',block_level='{block_level}' ...")
for blocked, reason in info.items():
logger.debug(f"blocked='{blocked}',reason[{type(reason)}]='{reason}' - BEFORE!")
blocked = tidyup.domain(blocked)
elif reason is not None:
raise ValueError(f"Cannot handle reason[]='{type(reason)}'")
- logger.debug(f"blocked='{blocked}',reason='{reason}' - AFTER!")
+ logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
if blocked == "":
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
nodeinfo_url = row[2]
logger.debug(f"blocked='{blocked}'")
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blocked.endswith(".tld"):
- logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
- continue
- elif blacklist.is_blacklisted(blocked):
- logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
+ if not utils.is_domain_wanted((blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
nodeinfo_url = row[2]
logger.debug(f"blocked='{blocked}'")
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blocked.endswith(".tld"):
- logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
- continue
- elif blacklist.is_blacklisted(blocked):
- logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
+ if not utils.is_domain_wanted((blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
logger.debug(f"record[]='{type(record)}'")
blocked = tidyup.domain(record["blocked"])
reason = tidyup.reason(record["reason"])
- logger.debug(f"blocked='{blocked}',reason='{reason}' - AFTER!")
+ logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
if blocked == "":
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
nodeinfo_url = row[2]
logger.debug(f"blocked='{blocked}'")
- if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}',software='pleroma' is not a valid domain name - SKIPPED!")
- continue
- elif blocked.endswith(".arpa"):
- logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
- elif blocked.endswith(".tld"):
- logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
+ if not utils.is_domain_wanted((blocked):
+ logger.warning("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
logger.debug(f"Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
blocks.update_reason(reason, domain, blocked, block_level)
- fba.connection.commit()
+ logger.debug("Invoking commit() ...")
+ database.connection.commit()
+
logger.debug("EXIT!")
def fetch_blocks_from_about(domain: str) -> dict:
--- /dev/null
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import hashlib
+import logging
+
+from urllib.parse import urlparse
+
+import bs4
+import requests
+import validators
+
+from fba.helpers import blacklist
+from fba.helpers import cookies
+from fba.helpers import tidyup
+
+from fba.http import federation
+from fba.http import network
+
+from fba.models import instances
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+##### Other functions #####
+
+def is_primitive(var: any) -> bool:
+ logger.debug(f"var[]='{type(var)}' - CALLED!")
+ return type(var) in {int, str, float, bool} or var is None
+
+def get_hash(domain: str) -> str:
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif domain.lower() != domain:
+ raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
+ elif not validators.domain(domain.split("/")[0]):
+ raise ValueError(f"domain='{domain}' is not a valid domain")
+ elif domain.endswith(".arpa"):
+ raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
+ elif domain.endswith(".tld"):
+ raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+
+ return hashlib.sha256(domain.encode("utf-8")).hexdigest()
+
+def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
+ logger.debug(f"url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
+ if not isinstance(url, str):
+ raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
+ elif url == "":
+ raise ValueError("Parameter 'url' is empty")
+ elif not isinstance(headers, dict):
+ raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
+ elif not isinstance(timeout, tuple):
+ raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
+
+ logger.debug(f"Parsing url='{url}'")
+ components = urlparse(url)
+
+ # Invoke other function, avoid trailing ?
+ logger.debug(f"components[{type(components)}]={components}")
+ if components.query != "":
+ response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
+ else:
+ response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
+
+ logger.debug(f"response[]='{type(response)}' - EXXIT!")
+ return response
+
+def process_domain(domain: str, blocker: str, command: str) -> bool:
+ logger.debug(f"domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif domain.lower() != domain:
+ raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
+ elif not validators.domain(domain.split("/")[0]):
+ raise ValueError(f"domain='{domain}' is not a valid domain")
+ elif domain.endswith(".arpa"):
+ raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
+ elif domain.endswith(".tld"):
+ raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+ elif not isinstance(blocker, str):
+ raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
+ elif blocker == "":
+ raise ValueError("Parameter 'blocker' is empty")
+ elif not validators.domain(blocker.split("/")[0]):
+ raise ValueError(f"blocker='{blocker}' is not a valid domain")
+ elif blocker.endswith(".arpa"):
+ raise ValueError(f"blocker='{blocker}' is a domain for reversed IP addresses, please don't crawl them!")
+ elif blocker.endswith(".tld"):
+ raise ValueError(f"blocker='{blocker}' is a fake domain, please don't crawl them!")
+ elif not isinstance(command, str):
+ raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
+ elif command == "":
+ raise ValueError("Parameter 'command' is empty")
+
+ if domain.find("*") > 0:
+ # Try to de-obscure it
+ row = instances.deobscure("*", domain)
+
+ logger.debug(f"row[{type(row)}]='{row}'")
+ if row is None:
+ logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
+ return False
+
+ logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
+ domain = row[0]
+ elif domain.find("?") > 0:
+ # Try to de-obscure it
+ row = instances.deobscure("?", domain)
+
+ logger.debug(f"row[{type(row)}]='{row}'")
+ if row is None:
+ logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
+ return False
+
+ logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
+ domain = row[0]
+
+ if not is_domain_wanted(domain)
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
+ return False
+ elif instances.is_recent(domain):
+ logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
+ return False
+
+ processed = False
+ try:
+ logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
+ federation.fetch_instances(domain, blocker, None, command)
+ processed = True
+
+ logger.debug("Invoking cookies.clear(%s) ...", domain)
+ cookies.clear(domain)
+ except network.exceptions as exception:
+ logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
+ instances.set_last_error(domain, exception)
+
+ logger.debug(f"processed='{processed}' - EXIT!")
+ return processed
+
+def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
+ logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
+ if not isinstance(tags, bs4.element.ResultSet):
+ raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
+ elif not isinstance(search, str):
+ raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
+ elif search == "":
+ raise ValueError("Parameter 'search' is empty")
+
+ domains = list()
+ for tag in tags:
+ logger.debug("tag[]='%s'", type(tag))
+ domain = tidyup.domain(tag.find(search).contents[0])
+
+ logger.debug("domain='%s'", domain)
+ if domain == "":
+ logger.debug("tag='%s' has no domain, trying <em> ...", tag)
+ domain = tidyup.domain(tag.find("em").contents[0])
+
+ if not is_domain_wanted(domain):
+ logger.debug("domain='%s' is not wanted - SKIPPED!")
+ continue
+
+ logger.debug("Appending domain='%s'", domain)
+ domains.append(domain)
+
+ logger.debug("domains()=%d - EXIT!", len(domains))
+ return domains
+
+def is_domain_wanted (domain: str) -> bool:
+ logger.debug("domain='%s' - CALLED!", domain)
+ wanted = True
+
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif domain.lower() != domain:
+ raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
+ elif not validators.domain(domain.split("/")[0]):
+ logger.debug("domain='%s' is not a valid domain name - settings False ...", domain)
+ wanted = False
+ elif domain.endswith(".arpa"):
+ logger.debug("domain='%s' is a domain for reversed IP addresses - settings False ...", domain)
+ wanted = False
+ elif domain.endswith(".tld"):
+ logger.debug("domain='%s' is a fake domain - settings False ...", domain)
+ wanted = False
+ elif blacklist.is_blacklisted(domain):
+ logger.debug("domain='%s' is blacklisted - settings False ...", domain)
+ wanted = False
+
+ logger.debug("wanted='%s' - EXIT!", wanted)
+ return wanted