for entry in rows["data"]["nodeinfo"]:
logger.debug(f"entry['{type(entry)}']='{entry}'")
if "domain" not in entry:
- logger.warning(f"entry()={len(entry)} does not contain 'domain' - SKIPPED!")
+ logger.warning("entry()=%d does not contain 'domain' - SKIPPED!", len(entry))
continue
elif not validators.domain(entry["domain"]):
- logger.warning(f"domain='{entry['domain']}' is not a valid domain - SKIPPED!")
+ logger.warning("domain='%s' is not a valid domain - SKIPPED!", entry['domain'])
+ continue
+ elif entry["domain"].endswith(".arpa"):
+ logger.debug("entry[domain]='%s' is a domain for reversed IP addresses - SKIPPED!", entry["domain"])
+ continue
+ elif entry["domain"].endswith(".tld"):
+ logger.debug("entry[domain]='%s' is a fake domain - SKIPPED!", entry['domain'])
continue
elif blacklist.is_blacklisted(entry["domain"]):
- logger.debug(f"domain='{entry['domain']}' is blacklisted - SKIPPED!")
+ logger.debug("domain='%s' is blacklisted - SKIPPED!", entry['domain'])
continue
elif instances.is_registered(entry["domain"]):
- logger.debug(f"domain='{entry['domain']}' is already registered - SKIPPED!")
- continue
- elif instances.is_recent(entry["domain"]):
- logger.debug(f"domain='{entry['domain']}' has been recently fetched - SKIPPED!")
+ logger.debug("domain='%s' is already registered - SKIPPED!", entry['domain'])
continue
logger.debug(f"Adding domain='{entry['domain']}' ...")
rows = fba.cursor.fetchall()
logger.info("Checking %d entries ...", len(rows))
for blocker, software, origin, nodeinfo_url in rows:
- logger.debug("BEFORE blocker,software,origin,nodeinfo_url:", blocker, software, origin, nodeinfo_url)
+ logger.debug("BEFORE blocker='%s',software='%s',origin='%s',nodeinfo_url='%s'", blocker, software, origin, nodeinfo_url)
blockdict = list()
blocker = tidyup.domain(blocker)
- logger.debug("AFTER blocker,software:", blocker, software)
+ logger.debug("AFTER blocker='%s',software='%s'", blocker, software)
if blocker == "":
logger.warning("blocker is now empty!")
block_level = tidyup.domain(block_level)
logger.debug("AFTER-block_level='%s'", block_level)
if block_level == "":
- logger.warning("block_level is empty, blocker:", blocker)
+ logger.warning("block_level is empty, blocker='%s'", blocker)
continue
logger.debug(f"Checking {len(blocklist)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
origin = row[1]
nodeinfo_url = row[2]
- logger.debug("Looking up instance by domain:", blocked)
+ logger.debug("Looking up instance by domainm, blocked='%s'", blocked)
if not validators.domain(blocked):
logger.warning(f"blocked='{blocked}',software='{software}' is not a valid domain name - SKIPPED!")
continue
logger.debug(f"blocked='{blocked}' is a fake domain - SKIPPED!")
continue
elif not instances.is_registered(blocked):
- logger.debug("Hash wasn't found, adding:", blocked, blocker)
+ logger.debug("Hash wasn't found, adding: blocked='%s',blocker='%s'", blocked, blocker)
try:
instances.add(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
except network.exceptions as exception:
logger.debug("Committing changes ...")
fba.connection.commit()
else:
- logger.warning("Unknown software:", blocker, software)
+ logger.warning("Unknown software: blocker='%s',software='%s'", blocker, software)
if instances.has_pending(blocker):
- logger.debug(f"Invoking instances.update_data({blocker}) ...")
+ logger.debug("Invoking instances.update_data(%s) ...", blocker)
instances.update_data(blocker)
if config.get("bot_enabled") and len(blockdict) > 0:
if not validators.domain(domain.split("/")[0]):
logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
continue
+ elif domain.endswith(".arpa"):
+ logger.debug("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
+ continue
+ elif domain.endswith(".tld"):
+ logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
+ continue
elif blacklist.is_blacklisted(domain):
logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
continue
logger.debug("EXIT!")
+def fetch_todon_wiki(args: argparse.Namespace):
+ logger.debug("args[]='%s' - CALLED!", type(args))
+
+ locking.acquire()
+ blocklist = {
+ "silenced": list(),
+ "reject": list(),
+ }
+
+ raw = fba.fetch_url("https://wiki.todon.eu/todon/domainblocks", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
+ logger.debug("raw()=%d,raw[]='%s'", len(raw), type(raw))
+
+ doc = bs4.BeautifulSoup(raw, "html.parser")
+ logger.debug("doc[]='%s'", type(doc))
+
+ silenced = doc.find("h3", {"id": "silencedlimited_servers"}).find_next("ul").findAll("li")
+ logger.info("Checking %d silenced/limited entries ...", len(silenced))
+ blocklist["silenced"] = fba.find_domains(silenced, "div")
+
+ suspended = doc.find("h3", {"id": "suspended_servers"}).find_next("ul").findAll("li")
+ logger.info("Checking %d suspended entries ...", len(suspended))
+ blocklist["reject"] = fba.find_domains(suspended, "div")
+
+ for block_level in blocklist:
+ blockers = blocklist[block_level]
+
+ logger.debug("block_level='%s',blockers()=%d'", block_level, len(blockers))
+ for blocked in blockers:
+ logger.debug("blocked='%s'", blocked)
+
+ if not instances.is_registered(blocked):
+ try:
+ logger.info(f"Fetching instances from domain='{row['domain']}' ...")
+ federation.fetch_instances(blocked, 'chaos.social', None, inspect.currentframe().f_code.co_name)
+
+ logger.debug(f"Invoking cookies.clear({row['domain']}) ...")
+ cookies.clear(blocked)
+ except network.exceptions as exception:
+ logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_cs) from domain='{row['domain']}'")
+ instances.set_last_error(blocked, exception)
+
+ if blocks.is_instance_blocked("todon.eu", blocked, block_level):
+ logger.debug("blocked='%s',block_level='%s' is already blocked - SKIPPED!", blocked, block_level)
+ continue
+
+ logger.info("Adding new block: blocked='%s',block_level='%s'", blocked, block_level)
+ blocks.add_instance("todon.eu", blocked, None, block_level)
+
+ logger.debug("Invoking commit() ...")
+ fba.connection.commit()
+
+ logger.debug("EXIT!")
+
def fetch_cs(args: argparse.Namespace):
logger.debug("args[]='%s' - CALLED!", type(args))
extensions = [
}
raw = fba.fetch_url("https://raw.githubusercontent.com/chaossocial/meta/master/federation.md", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
- logger.debug(f"raw()={len(raw)}[]='{type(raw)}'")
+ logger.debug("raw()=%d,raw[]='%s'", len(raw), type(raw))
doc = bs4.BeautifulSoup(markdown.markdown(raw, extensions=extensions), features='html.parser')
-
logger.debug(f"doc()={len(doc)}[]='{type(doc)}'")
+
silenced = doc.find("h2", {"id": "silenced-instances"}).findNext("table").find("tbody")
- logger.debug(f"silenced[]='{type(silenced)}'")
- domains["silenced"] = domains["silenced"] + federation.find_domains(silenced)
+ logger.debug("silenced[%s]()=%d", type(silenced), len(silenced))
+ domains["silenced"] = federation.find_domains(silenced)
blocked = doc.find("h2", {"id": "blocked-instances"}).findNext("table").find("tbody")
- logger.debug(f"blocked[]='{type(blocked)}'")
- domains["reject"] = domains["reject"] + federation.find_domains(blocked)
+ logger.debug("blocked[%s]()=%d", type(blocked), len(blocked))
+ domains["reject"] = federation.find_domains(blocked)
- logger.debug(f"domains()={len(domains)}")
+ logger.debug("domains[silenced]()=%d,domains[reject]()=%d", len(domains["silenced"]), len(domains["reject"]))
if len(domains) > 0:
locking.acquire()
- logger.info(f"Adding {len(domains)} new instances ...")
for block_level in domains:
- logger.debug(f"block_level='{block_level}'")
+ logger.info("block_level='%s' has %d row(s)", block_level, len(domains[block_level]))
for row in domains[block_level]:
logger.debug(f"row='{row}'")
- if not blocks.is_instance_blocked('chaos.social', row["domain"], block_level):
- logger.debug(f"domain='{row['domain']}',block_level='{block_level}' blocked by chaos.social, adding ...")
- blocks.add_instance('chaos.social', row["domain"], row["reason"], block_level)
-
if not instances.is_registered(row["domain"]):
try:
logger.info(f"Fetching instances from domain='{row['domain']}' ...")
logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_cs) from domain='{row['domain']}'")
instances.set_last_error(row["domain"], exception)
+ if not blocks.is_instance_blocked('chaos.social', row["domain"], block_level):
+ logger.debug(f"domain='{row['domain']}',block_level='{block_level}' blocked by chaos.social, adding ...")
+ blocks.add_instance('chaos.social', row["domain"], row["reason"], block_level)
+
logger.debug("Committing changes ...")
fba.connection.commit()
for row in rows:
logger.debug(f"domain='{row[0]}'")
if blacklist.is_blacklisted(row[0]):
- logger.warning("domain is blacklisted:", row[0])
+ logger.warning("domain is blacklisted: row[0]='%s'", row[0])
continue
try:
from urllib.parse import urlparse
+import bs4
import requests
import validators
from fba.helpers import blacklist
from fba.helpers import cookies
+from fba.helpers import tidyup
from fba.http import federation
from fba.http import network
raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
elif blocker == "":
raise ValueError("Parameter 'blocker' is empty")
+ elif not validators.domain(blocker.split("/")[0]):
+ raise ValueError(f"blocker='{blocker}' is not a valid domain")
+ elif blocker.endswith(".arpa"):
+ raise ValueError(f"blocker='{blocker}' is a domain for reversed IP addresses, please don't crawl them!")
+ elif blocker.endswith(".tld"):
+ raise ValueError(f"blocker='{blocker}' is a fake domain, please don't crawl them!")
elif not isinstance(command, str):
raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
elif command == "":
logger.debug(f"processed='{processed}' - EXIT!")
return processed
+
+def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
+ logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
+ if not isinstance(tags, bs4.element.ResultSet):
+ raise ValueError(f"Parameter tags[]='{type(tags)}' is not 'ResultSet'")
+ elif not isinstance(search, str):
+ raise ValueError(f"Parameter search[]='{type(search)}' is not 'str'")
+ elif search == "":
+ raise ValueError("Parameter 'search' is empty")
+
+ domains = list()
+ for tag in tags:
+ logger.debug("tag[]='%s'", type(tag))
+ domain = tidyup.domain(tag.find(search).contents[0])
+ logger.debug("domain='%s'", domain)
+ if domain == "":
+ logger.debug("tag='%s' has no domain, trying <em> ...", tag)
+ domain = tidyup.domain(tag.find("em").contents[0])
+
+ logger.debug("domain='%s'", domain)
+ if not validators.domain(domain):
+ logger.debug("domain='%s' is not a valid domain name - SKIPPED!", domain)
+ continue
+ elif domain.endswith(".arpa"):
+ logger.debug("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
+ continue
+ elif domain.endswith(".tld"):
+ logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
+ continue
+ elif blacklist.is_blacklisted(domain):
+ logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
+ continue
+
+ logger.debug("Appending domain='%s'", domain)
+ domains.append(domain)
+
+ logger.debug("domains()=%d - EXIT!", len(domains))
+ return domains