logger = logging.getLogger(__name__)
def check_instance(args: argparse.Namespace) -> int:
- logger.debug(f"args.domain='{args.domain}' - CALLED!")
+ logger.debug("args.domain='%s' - CALLED!", args.domain)
status = 0
if not validators.domain(args.domain):
- logger.warning(f"args.domain='{args.domain}' is not valid")
+ logger.warning("args.domain='%s' is not valid", args.domain)
status = 100
elif blacklist.is_blacklisted(args.domain):
- logger.warning(f"args.domain='{args.domain}' is blacklisted")
+ logger.warning("args.domain='%s' is blacklisted", args.domain)
status = 101
elif instances.is_registered(args.domain):
- logger.warning(f"args.domain='{args.domain}' is already registered")
+ logger.warning("args.domain='%s' is already registered", args.domain)
status = 102
else:
- logger.info(f"args.domain='{args.domain}' is not known")
+ logger.info("args.domain='%s' is not known", args.domain)
logger.debug(f"status={status} - EXIT!")
return status
def fetch_bkali(args: argparse.Namespace) -> int:
- logger.debug(f"args[]='{type(args)}' - CALLED!")
+ logger.debug("args[]='%s' - CALLED!", type(args))
domains = list()
try:
fetched = network.post_json_api("gql.api.bka.li", "/v1/graphql", json.dumps({
"query": "query domainlist {nodeinfo(order_by: {domain: asc}) {domain}}"
}))
- logger.debug(f"fetched[]='{type(fetched)}'")
+ logger.debug("fetched[]='%s'", type(fetched))
if "error_message" in fetched:
logger.warning(f"post_json_api() for 'gql.api.bka.li' returned error message: {fetched['error_message']}")
return 100
if len(domains) > 0:
locking.acquire()
- logger.info(f"Adding {len(domains)} new instances ...")
+ logger.info("Adding %d new instances ...", len(domains))
for domain in domains:
try:
- logger.info(f"Fetching instances from domain='{domain}' ...")
+ logger.info("Fetching instances from domain='%s' ...", domain)
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
logger.debug(f"Invoking cookies.clear({domain}) ...")
return 0
def fetch_blocks(args: argparse.Namespace):
- logger.debug(f"args[]='{type(args)}' - CALLED!")
+ logger.debug("args[]='%s' - CALLED!", type(args))
if args.domain is not None and args.domain != "":
logger.debug(f"args.domain='{args.domain}' - checking ...")
if not validators.domain(args.domain):
)
rows = fba.cursor.fetchall()
- logger.info(f"Checking {len(rows)} entries ...")
+ logger.info("Checking %d entries ...", len(rows))
for blocker, software, origin, nodeinfo_url in rows:
logger.debug("BEFORE blocker,software,origin,nodeinfo_url:", blocker, software, origin, nodeinfo_url)
blockdict = list()
instances.set_last_blocked(blocker)
if software == "pleroma":
- logger.info(f"blocker='{blocker}',software='{software}'")
+ logger.info("blocker='%s',software='%s'", blocker, software)
pleroma.fetch_blocks(blocker, origin, nodeinfo_url)
elif software == "mastodon":
- logger.info(f"blocker='{blocker}',software='{software}'")
+ logger.info("blocker='%s',software='%s'", blocker, software)
mastodon.fetch_blocks(blocker, origin, nodeinfo_url)
elif software == "lemmy":
- logger.info(f"blocker='{blocker}',software='{software}'")
+ logger.info("blocker='%s',software='%s'", blocker, software)
lemmy.fetch_blocks(blocker, origin, nodeinfo_url)
elif software == "friendica" or software == "misskey":
- logger.info(f"blocker='{blocker}',software='{software}'")
+ logger.info("blocker='%s',software='%s'", blocker, software)
blocking = list()
if software == "friendica":
elif software == "misskey":
blocking = misskey.fetch_blocks(blocker)
- logger.info(f"Checking {len(blocking.items())} entries from blocker='{blocker}',software='{software}' ...")
+ logger.info("Checking %s entries from blocker='%s',software='%s' ...", len(blocking.items()), blocker, software)
for block_level, blocklist in blocking.items():
- logger.debug("blocker,block_level,blocklist():", blocker, block_level, len(blocklist))
+ logger.debug("blocker='%s',block_level='%s',blocklist()=%d", blocker, block_level, len(blocklist))
block_level = tidyup.domain(block_level)
- logger.debug("AFTER-block_level:", block_level)
+ logger.debug("AFTER-block_level='%s'", block_level)
if block_level == "":
logger.warning("block_level is empty, blocker:", blocker)
continue
logger.debug(f"blocked='{blocked}',reason='{reason}' - AFTER!")
if blocked == "":
- logger.warning("blocked is empty:", blocker)
+ logger.warning("blocked is empty, blocker='%s'", blocker)
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - skipping!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif blocked.count("*") > 0:
# Some friendica servers also obscure domains without hash
row = instances.deobscure("*", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocker='{blocker}',software='{software}' - SKIPPED!")
continue
# Some obscure them with question marks, not sure if that's dependent on version or not
row = instances.deobscure("?", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocker='{blocker}',software='{software}' - SKIPPED!")
continue
logger.warning(f"blocked='{blocked}',software='{software}' is not a valid domain name - SKIPPED!")
continue
elif blocked.endswith(".arpa"):
- logger.debug(f"blocked='{blocked}' is a reverse IP domain - SKIPPED!")
+ logger.debug("blocked='%s' is a domain for reversed IP addresses - SKIPPED!", blocked)
continue
elif blocked.endswith(".tld"):
logger.debug(f"blocked='{blocked}' is a fake domain - SKIPPED!")
logger.debug("EXIT!")
def fetch_observer(args: argparse.Namespace):
- logger.debug(f"args[]='{type(args)}' - CALLED!")
+ logger.debug("args[]='%s' - CALLED!", type(args))
types = [
"akoma",
"birdsitelive",
locking.acquire()
- logger.info(f"Fetching {len(types)} different table data ...")
+ logger.info("Fetching %d different table data ...", len(types))
for software in types:
doc = None
logger.debug(f"raw[{type(raw)}]()={len(raw)}")
doc = bs4.BeautifulSoup(raw, features='html.parser')
- logger.debug(f"doc[]='{type(doc)}'")
+ logger.debug("doc[]='%'", type(doc))
except network.exceptions as exception:
logger.warning(f"Cannot fetch software='{software}' from fediverse.observer: '{type(exception)}'")
continue
items = doc.findAll("a", {"class": "url"})
- logger.info(f"Checking {len(items)} items,software='{software}' ...")
+ logger.info("Checking %d items,software='%s' ...", len(items), software)
for item in items:
- logger.debug(f"item[]='{type(item)}'")
+ logger.debug("item[]='%s'", type(item))
domain = item.decode_contents()
- logger.debug(f"domain='{domain}'")
+ logger.debug("domain='%s'", domain)
if not validators.domain(domain.split("/")[0]):
- logger.warning(f"domain='{domain}' is not a valid domain - SKIPPED!")
+ logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
continue
elif blacklist.is_blacklisted(domain):
- logger.debug(f"domain='{domain}' is blacklisted - SKIPPED!")
+ logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
continue
elif instances.is_registered(domain):
logger.debug(f"domain='{domain}' is already registered - SKIPPED!")
logger.debug("EXIT!")
def fetch_cs(args: argparse.Namespace):
- logger.debug(f"args[]='{type(args)}' - CALLED!")
+ logger.debug("args[]='%s' - CALLED!", type(args))
extensions = [
"extra",
"abbr",
logger.debug("EXIT!")
def fetch_fba_rss(args: argparse.Namespace):
- logger.debug(f"args[]='{type(args)}' - CALLED!")
+ logger.debug("args[]='%s' - CALLED!", type(args))
domains = list()
logger.info(f"Fetch FBA-specific RSS args.feed='{args.feed}' ...")
domain = item.link.split("=")[1]
if blacklist.is_blacklisted(domain):
- logger.debug(f"domain='{domain}' is blacklisted - SKIPPED!")
+ logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
continue
elif domain in domains:
logger.debug(f"domain='{domain}' is already added - SKIPPED!")
logger.debug("EXIT!")
def fetch_fbabot_atom(args: argparse.Namespace):
- logger.debug(f"args[]='{type(args)}' - CALLED!")
+ logger.debug("args[]='%s' - CALLED!", type(args))
feed = "https://ryona.agency/users/fba/feed.atom"
domains = list()
for entry in atom.entries:
logger.debug(f"entry[]='{type(entry)}'")
doc = bs4.BeautifulSoup(entry.content.value, "html.parser")
- logger.debug(f"doc[]='{type(doc)}'")
+ logger.debug("doc[]='%'", type(doc))
for element in doc.findAll("a"):
for href in element["href"].split(","):
logger.debug(f"href[{type(href)}]={href}")
domain = tidyup.domain(href)
- logger.debug(f"domain='{domain}'")
+ logger.debug("domain='%s'", domain)
if blacklist.is_blacklisted(domain):
- logger.debug(f"domain='{domain}' is blacklisted - SKIPPED!")
+ logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
continue
elif domain in domains:
logger.debug(f"domain='{domain}' is already added - SKIPPED!")
logger.debug("EXIT!")
def fetch_instances(args: argparse.Namespace) -> int:
- logger.debug(f"args[]='{type(args)}' - CALLED!")
+ logger.debug("args[]='%s' - CALLED!", type(args))
locking.acquire()
# Initial fetch
)
rows = fba.cursor.fetchall()
- logger.info(f"Checking {len(rows)} entries ...")
+ logger.info("Checking %d entries ...", len(rows))
for row in rows:
logger.debug(f"domain='{row[0]}'")
if blacklist.is_blacklisted(row[0]):
return 0
def fetch_oliphant(args: argparse.Namespace):
- logger.debug(f"args[]='{type(args)}' - CALLED!")
+ logger.debug("args[]='%s' - CALLED!", type(args))
locking.acquire()
# Base URL
logger.info(f"Fetching csv_url='{block['csv_url']}' for blocker='{block['blocker']}' ...")
response = fba.fetch_url(f"{base_url}/{block['csv_url']}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
- logger.debug(f"response[]='{type(response)}'")
+ logger.debug("response[]='%s'", type(response))
if response.ok and response.content != "":
logger.debug(f"Fetched {len(response.content)} Bytes, parsing CSV ...")
reader = csv.DictReader(response.content.decode('utf-8').splitlines(), dialect="unix")
continue
if not validators.domain(domain):
- logger.warning(f"domain='{domain}' is not a valid domain name - SKIPPED!")
+ logger.warning("domain='%s' is not a valid domain name - SKIPPED!", domain)
continue
elif domain.endswith(".arpa"):
- logger.debug(f"domain='{domain}' is a reverse IP domain - SKIPPED!")
+ logger.debug("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
continue
elif domain.endswith(".tld"):
- logger.debug(f"domain='{domain}' is a fake domain - SKIPPED!")
+ logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
continue
elif blacklist.is_blacklisted(domain):
- logger.debug(f"domain='{domain}' is blacklisted - SKIPPED!")
+ logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
continue
logger.debug(f"Marking domain='{domain}' as handled")
logger.debug("EXIT!")
def fetch_txt(args: argparse.Namespace):
- logger.debug(f"args[]='{type(args)}' - CALLED!")
+ logger.debug("args[]='%s' - CALLED!", type(args))
locking.acquire()
# Static URLs
logger.info(f"Checking {len(urls)} text file(s) ...")
for url in urls:
- logger.debug(f"Fetching url='{url}' ...")
+ logger.debug("Fetching url='%s' ...", url)
response = fba.fetch_url(url, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
- logger.debug(f"response[]='{type(response)}'")
+ logger.debug("response[]='%s'", type(response))
if response.ok and response.text != "":
logger.debug(f"Returned {len(response.text.strip())} Bytes for processing")
domains = response.text.split("\n")
logger.info(f"Processing {len(domains)} domains ...")
for domain in domains:
- logger.debug(f"domain='{domain}'")
+ logger.debug("domain='%s'", domain)
if domain == "":
logger.debug("domain is empty - SKIPPED!")
continue
elif not validators.domain(domain):
- logger.warning(f"domain='{domain}' is not a valid domain name - SKIPPED!")
+ logger.warning("domain='%s' is not a valid domain name - SKIPPED!", domain)
continue
elif domain.endswith(".arpa"):
- logger.debug(f"domain='{domain}' is a reverse IP domain - SKIPPED!")
+ logger.debug("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
continue
elif domain.endswith(".tld"):
- logger.debug(f"domain='{domain}' is a fake domain - SKIPPED!")
+ logger.debug("domain='%s' is a fake domain - SKIPPED!", domain)
continue
elif blacklist.is_blacklisted(domain):
- logger.debug(f"domain='{domain}' is blacklisted - SKIPPED!")
+ logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
continue
- logger.debug(f"domain='{domain}'")
+ logger.debug("domain='%s'", domain)
processed = fba.process_domain(domain, 'seirdy.one', inspect.currentframe().f_code.co_name)
logger.debug(f"processed='{processed}'")
return type(var) in {int, str, float, bool} or var is None
def get_hash(domain: str) -> str:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
domain = row[0]
if not validators.domain(domain.split("/")[0]):
- logger.warning(f"domain='{domain}' is not a valid domain - SKIPPED!")
+ logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
return False
elif domain.endswith(".arpa"):
logger.warning(f"domain='{domain}' is a reversed .arpa domain and should not be used generally.")
return False
elif blacklist.is_blacklisted(domain):
- logger.debug(f"domain='{domain}' is blacklisted - SKIPPED!")
+ logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
return False
elif instances.is_recent(domain):
logger.debug(f"domain='{domain}' has been recently checked - SKIPPED!")
processed = False
try:
- logger.info(f"Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
+ logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
federation.fetch_instances(domain, blocker, None, command)
processed = True
]
def is_blacklisted(domain: str) -> bool:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
_cookies[domain] = cookies
- logger.debug(f"EXIT!")
+ logger.debug("EXIT!")
def get_all(domain: str) -> dict:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
return _cookies[domain]
def has (domain: str) -> bool:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
return has_cookies
def clear (domain: str):
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
logger.debug(f"Removing cookies for domain='{domain}' ...")
del _cookies[domain]
- logger.debug(f"EXIT!")
+ logger.debug("EXIT!")
logger.debug(f"end[{type(end)}]='{end}'")
software = software[0:end].strip()
- logger.debug(f"software[{type(software)}]='{software}'")
+ logger.debug("software[%s]='%s'", type(software), software)
software = strip_until(software, " - ")
logger.debug(f"domain='{domain}' has pending nodeinfo data, flushing ...")
instances.update_data(domain)
- logger.info(f"Checking {len(peerlist)} instances from domain='{domain}' ...")
+ logger.info("Checking %d instances from domain='%s' ...", len(peerlist), domain)
for instance in peerlist:
logger.debug(f"instance='{instance}'")
if instance is None:
(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug(f"data[]='{type(data)}'")
+ logger.debug("data[]='%s'", type(data))
if "error_message" in data:
logger.debug("Was not able to fetch peers, trying alternative ...")
data = network.get_json_api(
(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug(f"response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+ logger.debug("data[]='%s'", type(data))
if "error_message" in data:
logger.warning(f"Could not reach any JSON API at domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'")
elif "federated_instances" in data["json"]:
(config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout"))
)
- logger.debug(f"response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+ logger.debug("data[]='%s'", type(data))
if "error_message" not in data:
logger.debug("Success:", request)
instances.set_detection_mode(domain, "STATIC_CHECK")
logger.warning(f"Failed fetching nodeinfo from domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'")
- logger.debug(f"data()={len(data)} - EXIT!")
+ logger.debug("data()=%d - EXIT!", len(data))
return data
def fetch_wellknown_nodeinfo(domain: str) -> dict:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
logger.warning(f"components.netloc='{components.netloc}' is not a valid domain - SKIPPED!")
continue
elif domain.endswith(".arpa"):
- logger.warning(f"domain='{domain}' is a domain for reversed IP addresses - SKIPPED!")
+ logger.warning("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
continue
elif domain.endswith(".tld"):
- logger.warning(f"domain='{domain}' is a fake domain - SKIPPED!")
+ logger.warning("domain='%s' is a fake domain - SKIPPED!", domain)
continue
elif blacklist.is_blacklisted(components.netloc):
logger.debug(f"components.netloc='{components.netloc}' is blacklisted - SKIPPED!")
if isinstance(generator, bs4.element.Tag) and isinstance(generator.get("content"), str):
logger.debug("Found generator meta tag:", domain)
software = tidyup.domain(generator.get("content"))
- logger.debug(f"software[{type(software)}]='{software}'")
+ logger.debug("software[%s]='%s'", type(software), software)
if software is not None and software != "":
- logger.info(f"domain='{domain}' is generated by '{software}'")
+ logger.info("domain='%s' is generated by '%s'", domain, software)
instances.set_detection_mode(domain, "GENERATOR")
elif isinstance(site_name, bs4.element.Tag) and isinstance(site_name.get("content"), str):
logger.debug("Found property=og:site_name:", domain)
software = tidyup.domain(site_name.get("content"))
- logger.debug(f"software[{type(software)}]='{software}'")
+ logger.debug("software[%s]='%s'", type(software), software)
if software is not None and software != "":
- logger.info(f"domain='{domain}' has og:site_name='{software}'")
+ logger.info("domain='%s' has og:site_name='%s'", domain, software)
instances.set_detection_mode(domain, "SITE_NAME")
- logger.debug(f"software[]='{type(software)}'")
+ logger.debug("software[]='%s'", type(software))
if isinstance(software, str) and software == "":
logger.debug(f"Corrected empty string to None for software of domain='{domain}'")
software = None
logger.debug(f"software='{software}' may contain a version number, domain='{domain}', removing it ...")
software = version.remove(software)
- logger.debug(f"software[]='{type(software)}'")
+ logger.debug("software[]='%s'", type(software))
if isinstance(software, str) and "powered by " in software:
logger.debug(f"software='{software}' has 'powered by' in it")
software = version.remove(version.strip_powered_by(software))
logger.debug(f"software='{software}' has ' see ' in it")
software = version.strip_until(software, " see ")
- logger.debug(f"software[]='{type(software)}'")
+ logger.debug("software[]='%s'", type(software))
if software == "":
logger.warning("tidyup.domain() left no software name behind:", domain)
software = None
- logger.debug(f"software[]='{type(software)}'")
+ logger.debug("software[]='%s'", type(software))
if str(software) == "":
logger.debug(f"software for '{domain}' was not detected, trying generator ...")
software = fetch_generator_from_path(domain)
logger.debug(f"software='{software}' may contain a version number, domain='{domain}', removing it ...")
software = version.remove(software)
- logger.debug(f"software[]='{type(software)}'")
+ logger.debug("software[]='%s'", type(software))
if isinstance(software, str) and "powered by" in software:
logger.debug(f"software='{software}' has 'powered by' in it")
software = version.remove(version.strip_powered_by(software))
domain = tidyup.domain(element.find("td").text)
reason = tidyup.reason(element.findAll("td")[1].text)
- logger.debug(f"domain='{domain}',reason='{reason}'")
+ logger.debug("domain='%s',reason='%s'", domain, reason)
if not validators.domain(domain.split("/")[0]):
- logger.warning(f"domain='{domain}' is not a valid domain - SKIPPED!")
+ logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
continue
elif domain.endswith(".arpa"):
- logger.warning(f"domain='{domain}' is a domain for reversed IP addresses - SKIPPED!")
+ logger.warning("domain='%s' is a domain for reversed IP addresses - SKIPPED!", domain)
continue
elif domain.endswith(".tld"):
- logger.warning(f"domain='{domain}' is a fake domain - SKIPPED!")
+ logger.warning("domain='%s' is a fake domain - SKIPPED!", domain)
continue
elif blacklist.is_blacklisted(domain):
- logger.debug(f"domain='{domain}' is blacklisted - SKIPPED!")
+ logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
continue
elif domain == "gab.com/.ai, develop.gab.com":
logger.debug("Multiple domains detected in one row")
})
continue
elif not validators.domain(domain.split("/")[0]):
- logger.warning(f"domain='{domain}' is not a valid domain - SKIPPED!")
+ logger.warning("domain='%s' is not a valid domain - SKIPPED!", domain)
continue
logger.debug(f"Adding domain='{domain}',reason='{reason}' ...")
}
try:
- logger.debug(f"Fetching url='{url}' ...")
+ logger.debug("Fetching url='%s' ...", url)
response = fba.fetch_url(url, api_headers, timeout)
json_reply["json"] = json_from_response(response)
# Maybe needs cleaning
reason = tidyup.reason(reason)
- logger.info(f"New block: blocker='{blocker}',blocked='{blocked}',reason='{reason}',block_level='{block_level}'")
+ logger.info("New block: blocker='%s',blocked='%s',reason='%s',block_level='%s'", blocker, blocked, reason, block_level)
fba.cursor.execute(
"INSERT INTO blocks (blocker, blocked, reason, block_level, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
elif domain.endswith(".tld"):
raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
elif config.get("write_error_log").lower() != "true":
- logger.debug(f"Writing to error_log is disabled in configuruation file - EXIT!")
+ logger.debug("Writing to error_log is disabled in configuruation file - EXIT!")
return
logger.debug("BEFORE error[]:", type(error))
logger.debug("EXIT!")
def has_pending(domain: str) -> bool:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
return has
def update_data(domain: str):
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
logger.warning(f"domain='{domain}' already registered after cutting off user part. - EXIT!")
return
- logger.info(f"Adding instance domain='{domain}' (origin='{origin}',software='{software}')")
+ logger.info("Adding instance domain='%s' (origin='%s',software='%s')", domain, origin, software)
fba.cursor.execute(
"INSERT INTO instances (domain, origin, command, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
(
logger.debug("EXIT!")
def set_last_nodeinfo(domain: str):
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
logger.debug("EXIT!")
def is_registered(domain: str) -> bool:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
elif domain.endswith(".tld"):
raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not cache.key_exists("is_registered"):
logger.debug("Cache for 'is_registered' not initialized, fetching all rows ...")
fba.cursor.execute("SELECT domain FROM instances")
return registered
def is_recent(domain: str) -> bool:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
)
row = fba.cursor.fetchone()
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.debug(f"blocked_hash='{blocked_hash}' not found, trying domain='{domain}' ...")
)
row = fba.cursor.fetchone()
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
logger.debug(f"row[]='{type(row)}' - EXIT!")
return row
def set_last_blocked(domain: str):
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
logger.debug("EXIT!")
def set_last_instance_fetch(domain: str):
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
logger = logging.getLogger(__name__)
def fetch_blocks(domain: str) -> dict:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
).text,
"html.parser",
)
- logger.debug(f"doc[]='{type(doc)}'")
+ logger.debug("doc[]='%'", type(doc))
block_tag = doc.find(id="about_blocklist")
except network.exceptions as exception:
logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - SKIPPED!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
logger.debug(f"Appending blocked='{blocked}',reason='{reason}'")
logger = logging.getLogger(__name__)
def fetch_peers(domain: str) -> list:
- logger.debug(f"domain({len(domain)})='{domain}',software='lemmy' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug(f"data[]='{type(data)}'")
+ logger.debug("data[]='%s'", type(data))
if "error_message" in data:
logger.warning("Could not reach any JSON API:", domain)
instances.set_last_error(domain, data)
logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - SKIPPED!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug("Hash wasn't found, adding:", blocked, domain)
logger.debug("Committing changes ...")
fba.connection.commit()
except network.exceptions as exception:
- logger.warning(f"domain='{domain}',software='mastodon',exception[{type(exception)}]:'{str(exception)}'")
+ logger.warning(f"domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
instances.set_last_error(domain, exception)
logger.debug("EXIT!")
}
def fetch_blocks_from_about(domain: str) -> dict:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
"Silenced servers" : [],
}
- logger.debug(f"doc[]='{type(doc)}'")
+ logger.debug("doc[]='%'", type(doc))
if doc is None:
logger.warning(f"Cannot fetch any /about pages for domain='{domain}' - EXIT!")
return blocklist
for header in doc.find_all("h3"):
header_text = tidyup.reason(header.text)
- logger.debug(f"header_text='{header_text}'")
+ logger.debug("header_text='%s'", header_text)
if header_text in language_mapping:
- logger.debug(f"header_text='{header_text}'")
+ logger.debug("header_text='%s'", header_text)
header_text = language_mapping[header_text]
else:
logger.warning(f"header_text='{header_text}' not found in language mapping table")
(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug(f"data[]='{type(data)}'")
+ logger.debug("data[]='%s'", type(data))
if "error_message" in data:
logger.debug(f"Was not able to fetch domain_blocks from domain='{domain}': status_code='{data['status_code']}',error_message='{data['error_message']}'")
instances.set_last_error(domain, data)
blocklist = data["json"]
if len(blocklist) > 0:
- logger.info(f"Checking {len(blocklist)} entries from domain='{domain}',software='mastodon' ...")
+ logger.info("Checking %d entries from domain='%s' ...", len(blocklist), domain)
for block in blocklist:
# Check type
logger.debug(f"block[]='{type(block)}'")
logger.debug(f"domain='{domain}' has returned zero rows, trying /about/more page ...")
rows = fetch_blocks_from_about(domain)
- logger.info(f"Checking {len(rows.items())} entries from domain='{domain}',software='mastodon' ...")
+ logger.info("Checking %d entries from domain='%s' ...", len(rows.items()), domain)
for block_level, blocklist in rows.items():
logger.debug("domain,block_level,blocklist():", domain, block_level, len(blocklist))
block_level = tidyup.domain(block_level)
logger.debug(f"domain='{domain}' skipping block_level='accept'")
continue
- logger.debug(f"Checking {len(blocklist)} entries from domain='{domain}',software='mastodon',block_level='{block_level}' ...")
+ logger.debug(f"Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
for block in blocklist:
logger.debug(f"block[]='{type(block)}'")
blocked, blocked_hash, reason = block.values()
logger.debug(f"blocked='{blocked}',reason='{reason}' - AFTER!")
if blocked == "":
- logger.warning("blocked is empty:", domain)
+ logger.warning("blocked is empty, domain='%s'", domain)
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - skipping!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif blocked.count("*") > 0:
# Doing the hash search for instance names as well to tidy up DB
row = instances.deobscure("*", blocked, blocked_hash)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
continue
# Doing the hash search for instance names as well to tidy up DB
row = instances.deobscure("?", blocked, blocked_hash)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
continue
logger.debug("Looking up instance by domain:", blocked)
if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}',software='mastodon' is not a valid domain name - SKIPPED!")
+ logger.warning(f"blocked='{blocked}' is not a valid domain name - SKIPPED!")
continue
elif blocked.endswith(".arpa"):
logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - SKIPPED!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
logger.debug("Looking up instance by domain:", blocked)
if not validators.domain(blocked):
- logger.warning(f"blocked='{blocked}',software='mastodon' is not a valid domain name - SKIPPED!")
+ logger.warning(f"blocked='{blocked}' is not a valid domain name - SKIPPED!")
continue
elif blocked.endswith(".arpa"):
logger.warning(f"blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - SKIPPED!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug("Hash wasn't found, adding:", blocked, domain)
logger.debug("Committing changes ...")
fba.connection.commit()
except network.exceptions as exception:
- logger.warning(f"domain='{domain}',software='mastodon',exception[{type(exception)}]:'{str(exception)}'")
+ logger.warning(f"domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
instances.set_last_error(domain, exception)
logger.debug("EXIT!")
logger = logging.getLogger(__name__)
def fetch_peers(domain: str) -> list:
- logger.debug(f"domain({len(domain)})='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
}), headers)
# Check records
- logger.debug(f"fetched[]='{type(fetched)}'")
+ logger.debug("fetched[]='%s'", type(fetched))
if "error_message" in fetched:
logger.warning(f"post_json_api() for domain='{domain}' returned error message: {fetched['error_message']}")
instances.set_last_error(domain, fetched)
return peers
def fetch_blocks(domain: str) -> dict:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
"offset" : offset - 1
}), headers)
- logger.debug(f"fetched[]='{type(fetched)}'")
+ logger.debug("fetched[]='%s'", type(fetched))
if "error_message" in fetched:
logger.warning(f"post_json_api() for domain='{domain}' returned error message: {fetched['error_message']}")
instances.set_last_error(domain, fetched)
"offset" : offset - 1
}), headers)
- logger.debug(f"fetched[]='{type(fetched)}'")
+ logger.debug("fetched[]='%s'", type(fetched))
if "error_message" in fetched:
logger.warning(f"post_json_api() for domain='{domain}' returned error message: {fetched['error_message']}")
instances.set_last_error(domain, fetched)
(config.get("connection_timeout"), config.get("read_timeout"))
)
- logger.debug(f"data[]='{type(data)}'")
+ logger.debug("data[]='%s'", type(data))
if "error_message" not in data:
logger.debug(f"Success, data[json]()={len(data['json'])}")
if "data" in data["json"]:
data = rows["metadata"]["federation"]
found = False
- logger.debug(f"data[]='{type(data)}'")
+ logger.debug("data[]='%s'", type(data))
if "mrf_simple" in data:
logger.debug("Found mrf_simple:", domain)
found = True
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - skipping!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif blocked.count("*") > 0:
# Obscured domain name with no hash
row = instances.deobscure("*", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
# Obscured domain name with no hash
row = instances.deobscure("?", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - SKIPPED!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
# Commit changes
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - skipping!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif blocked.count("*") > 0:
# Obscured domain name with no hash
row = instances.deobscure("*", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
# Obscured domain name with no hash
row = instances.deobscure("?", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - SKIPPED!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
# Commit changes
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - skipping!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif blocked.count("*") > 0:
# Obscured domain name with no hash
row = instances.deobscure("*", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
# Obscured domain name with no hash
row = instances.deobscure("?", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - SKIPPED!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - skipping!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif blocked.count("*") > 0:
# Obscured domain name with no hash
row = instances.deobscure("*", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
# Obscured domain name with no hash
row = instances.deobscure("?", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
logger.warning(f"blocked='{blocked}' is a fake domain, please don't crawl them!")
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - SKIPPED!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
logger.debug(f"blocklist()={len(blocklist)}")
if len(blocklist) > 0:
- logger.info(f"Checking {len(blocklist)} record(s) ...")
+ logger.info("Checking %s record(s) ...", len(blocklist))
for block_level in blocklist:
logger.debug(f"block_level='{block_level}'")
rows = blocklist[block_level]
logger.warning("blocked is empty after tidyup.domain():", domain, block_level)
continue
elif blacklist.is_blacklisted(blocked):
- logger.debug(f"blocked='{blocked}' is blacklisted - skipping!")
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
continue
elif blocked.count("*") > 0:
# Obscured domain name with no hash
row = instances.deobscure("*", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
# Obscured domain name with no hash
row = instances.deobscure("?", blocked)
- logger.debug(f"row[]='{type(row)}'")
+ logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
continue
logger.debug("EXIT!")
def fetch_blocks_from_about(domain: str) -> dict:
- logger.debug(f"domain='{domain}' - CALLED!")
+ logger.debug("domain(%d)='%s' - CALLED!", len(domain), domain)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
"html.parser",
)
- logger.debug(f"doc[]='{type(doc)}'")
+ logger.debug("doc[]='%'", type(doc))
if doc.find("h2") is not None:
logger.debug(f"Found 'h2' header in path='{path}' - BREAK!")
break
"Silenced servers" : [],
}
- logger.debug(f"doc[]='{type(doc)}'")
+ logger.debug("doc[]='%'", type(doc))
if doc is None:
logger.warning(f"Cannot fetch any /about pages for domain='{domain}' - EXIT!")
return blocklist