_PARSER = None
def init_parser():
- logger.debug("init_parser(): CALLED!")
+ logger.debug("CALLED!")
global _PARSER
logger.debug("Initializing parser ...")
)
parser.set_defaults(command=commands.fetch_fedipact)
- logger.debug("init_parser(): EXIT!")
+ logger.debug("EXIT!")
def run_command():
- logger.debug("run_command(): CALLED!")
+ logger.debug("CALLED!")
args = _PARSER.parse_args()
if args.log_level is not None:
for _logger in loggers:
_logger.setLevel(args.log_level)
- logger.debug(f"args[{type(args)}]={args}")
+ logger.debug("args[%s]='%s'", type(args), args)
status = args.command(args)
- logger.debug("status={status} - EXIT!")
- return status if isinstance(status, int) else 0
+
+ logger.debug("status=%d - EXIT!", status)
+ return status
def shutdown():
logger.debug("Closing database connection ...")
else:
logger.info("args.domain='%s' is not known", args.domain)
- logger.debug(f"status={status} - EXIT!")
+ logger.debug("status='%d' - EXIT!", status)
return status
def fetch_bkali(args: argparse.Namespace) -> int:
rows = fetched["json"]
- logger.debug(f"rows({len(rows)})[]='{type(rows)}'")
+ logger.debug("rows(%d)[]='%s'", len(rows), type(rows))
if len(rows) == 0:
raise Exception("WARNING: Returned no records")
elif "data" not in rows:
raise Exception(f"WARNING: rows()={len(rows['data'])} does not contain key 'nodeinfo'")
for entry in rows["data"]["nodeinfo"]:
- logger.debug(f"entry['{type(entry)}']='{entry}'")
+ logger.debug("entry[%s]='%s'", type(entry), entry)
if "domain" not in entry:
logger.warning("entry()=%d does not contain 'domain' - SKIPPED!", len(entry))
continue
logger.debug("domain='%s' is already registered - SKIPPED!", entry['domain'])
continue
- logger.debug(f"Adding domain='{entry['domain']}' ...")
+ logger.debug("Adding domain='%s' ...", entry['domain'])
domains.append(entry["domain"])
except network.exceptions as exception:
- logger.error(f"Cannot fetch graphql,exception[{type(exception)}]:'{str(exception)}' - EXIT!")
+ logger.warning("Cannot fetch graphql,exception[%s]:'%s' - EXIT!", type(exception), str(exception))
return 102
- logger.debug(f"domains()={len(domains)}")
+ logger.debug("domains()=%d", len(domains))
if len(domains) > 0:
locking.acquire()
logger.debug("Success - EXIT!")
return 0
-def fetch_blocks(args: argparse.Namespace):
+def fetch_blocks(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
if args.domain is not None and args.domain != "":
- logger.debug(f"args.domain='{args.domain}' - checking ...")
+ logger.debug("args.domain='%s' - checking ...", args.domain)
if not validators.domain(args.domain):
logger.warning("args.domain='%s' is not valid.", args.domain)
- return
+ return 100
elif blacklist.is_blacklisted(args.domain):
logger.warning("args.domain='%s' is blacklisted, won't check it!", args.domain)
- return
+ return 101
elif not instances.is_registered(args.domain):
logger.warning("args.domain='%s' is not registered, please run ./utils.py fetch_instances '%s' first.", args.domain, args.domain)
- return
+ return 102
locking.acquire()
if config.get("bot_enabled") and len(blockdict) > 0:
network.send_bot_post(blocker, blockdict)
- logger.debug(f"Invoking cookies.clear({blocker}) ...")
+ logger.debug("Invoking cookies.clear(%s) ...", blocker)
cookies.clear(blocker)
- logger.debug("EXIT!")
+ logger.debug("Success! - EXIT!")
+ return 0
-def fetch_observer(args: argparse.Namespace):
+def fetch_observer(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
types = [
"akoma",
doc = None
try:
- logger.debug(f"Fetching table data for software='{software}' ...")
+ logger.debug("Fetching table data for software='%s' ...", software)
raw = utils.fetch_url(f"https://fediverse.observer/app/views/tabledata.php?software={software}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
logger.debug("raw[%s]()=%d", type(raw), len(raw))
logger.debug("Invoking cookies.clear(%s) ...", domain)
cookies.clear(domain)
- logger.debug("EXIT!")
+ logger.debug("Success! - EXIT!")
+ return 0
-def fetch_todon_wiki(args: argparse.Namespace):
+def fetch_todon_wiki(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
locking.acquire()
logger.debug("Invoking commit() ...")
database.connection.commit()
- logger.debug("EXIT!")
+ logger.debug("Success! - EXIT!")
+ return 0
def fetch_cs(args: argparse.Namespace):
logger.debug("args[]='%s' - CALLED!", type(args))
logger.debug("raw()=%d,raw[]='%s'", len(raw), type(raw))
doc = bs4.BeautifulSoup(markdown.markdown(raw, extensions=extensions), features='html.parser')
- logger.debug(f"doc()={len(doc)}[]='{type(doc)}'")
+ logger.debug("doc()=%d[]='%s'", len(doc), type(doc))
silenced = doc.find("h2", {"id": "silenced-instances"}).findNext("table").find("tbody")
logger.debug("silenced[%s]()=%d", type(silenced), len(silenced))
logger.info("block_level='%s' has %d row(s)", block_level, len(domains[block_level]))
for row in domains[block_level]:
- logger.debug(f"row='{row}'")
+ logger.debug("row[%s]='%s'", type(row), row)
if not instances.is_registered(row["domain"]):
try:
logger.info("Fetching instances from domain='%s' ...", row["domain"])
logger.debug("Invoking commit() ...")
database.connection.commit()
- logger.debug("EXIT!")
+ logger.debug("Success! - EXIT!")
+ return 0
-def fetch_fba_rss(args: argparse.Namespace):
+def fetch_fba_rss(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
domains = list()
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and len(response.text) > 0:
- logger.debug(f"Parsing RSS feed ({len(response.text)} Bytes) ...")
+ logger.debug("Parsing RSS feed (%d Bytes) ...", len(response.text))
rss = atoma.parse_rss_bytes(response.content)
- logger.debug(f"rss[]='{type(rss)}'")
+ logger.debug("rss[]='%s'", type(rss))
for item in rss.items:
- logger.debug(f"item={item}")
+ logger.debug("item='%s'", item)
domain = item.link.split("=")[1]
if blacklist.is_blacklisted(domain):
logger.debug("domain='%s' is already registered - SKIPPED!", domain)
continue
- logger.debug(f"Adding domain='{domain}'")
+ logger.debug("Adding domain='%s'", domain)
domains.append(domain)
- logger.debug(f"domains()={len(domains)}")
+ logger.debug("domains()=%d", len(domains))
if len(domains) > 0:
locking.acquire()
logger.warning("Exception '%s' during fetching instances (fetch_fba_rss) from domain='%s'", type(exception), domain)
instances.set_last_error(domain, exception)
- logger.debug("EXIT!")
+ logger.debug("Success! - EXIT!")
+ return 0
-def fetch_fbabot_atom(args: argparse.Namespace):
+def fetch_fbabot_atom(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
feed = "https://ryona.agency/users/fba/feed.atom"
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and len(response.text) > 0:
- logger.debug(f"Parsing ATOM feed ({len(response.text)} Bytes) ...")
+ logger.debug("Parsing ATOM feed (%d Bytes) ...", len(response.text))
atom = atoma.parse_atom_bytes(response.content)
- logger.debug(f"atom[]='{type(atom)}'")
+ logger.debug("atom[]='%s'", type(atom))
for entry in atom.entries:
- logger.debug(f"entry[]='{type(entry)}'")
+ logger.debug("entry[]='%s'", type(entry))
doc = bs4.BeautifulSoup(entry.content.value, "html.parser")
logger.debug("doc[]='%s'", type(doc))
for element in doc.findAll("a"):
domain = tidyup.domain(href)
logger.debug("domain='%s'", domain)
- if blacklist.is_blacklisted(domain):
- logger.debug("domain='%s' is blacklisted - SKIPPED!", domain)
+ if not utils.is_domain_wanted(domain):
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif domain in domains:
logger.debug("domain='%s' is already added - SKIPPED!", domain)
logger.debug("domain='%s' is already registered - SKIPPED!", domain)
continue
- logger.debug(f"Adding domain='{domain}',domains()={len(domains)}")
+ logger.debug("Adding domain='%s',domains()=%d", domain, len(domains))
domains.append(domain)
- logger.debug(f"domains({len(domains)})={domains}")
+ logger.debug("domains(%d)='%s", len(domains), domains)
if len(domains) > 0:
locking.acquire()
logger.warning("Exception '%s' during fetching instances (fetch_fbabot_atom) from domain='%s'", type(exception), domain)
instances.set_last_error(domain, exception)
- logger.debug("EXIT!")
+ logger.debug("Success! - EXIT!")
+ return 0
def fetch_instances(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
logger.info("Fetching instances from args.domain='%s' ...", args.domain)
federation.fetch_instances(args.domain, None, None, inspect.currentframe().f_code.co_name)
- logger.debug(f"Invoking cookies.clear({args.domain}) ...")
+ logger.debug("Invoking cookies.clear(%s) ...", args.domain)
cookies.clear(args.domain)
except network.exceptions as exception:
logger.warning("Exception '%s' during fetching instances (fetch_instances) from args.domain='%s'", type(exception), args.domain)
for row in rows:
logger.debug("domain='%s'", row[0])
if blacklist.is_blacklisted(row[0]):
- logger.warning("domain is blacklisted: row[0]='%s'", row[0])
+ logger.warning("Domain is blacklisted: row[0]='%s'", row[0])
continue
try:
logger.info("Fetching instances for instance domain='%s',software='%s',origin='%s',nodeinfo_url='%s'", row[0], row[2], row[1], row[3])
federation.fetch_instances(row[0], row[1], row[2], inspect.currentframe().f_code.co_name, row[3])
- logger.debug(f"Invoking cookies.clear({row[0]}) ...")
+ logger.debug("Invoking cookies.clear(%s) ...", row[0])
cookies.clear(row[0])
except network.exceptions as exception:
logger.warning("Exception '%s' during fetching instances (fetch_instances) from row[0]='%s'", type(exception), row[0])
logger.debug("Success - EXIT!")
return 0
-def fetch_oliphant(args: argparse.Namespace):
+def fetch_oliphant(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
locking.acquire()
logger.debug("reader[]='%s'", type(reader))
for row in reader:
+ logger.debug("row[%s]='%s'", type(row), row)
domain = None
if "#domain" in row:
domain = row["#domain"]
elif "domain" in row:
domain = row["domain"]
else:
- logger.debug(f"row='{row}' does not contain domain column")
+ logger.debug("row='%s' does not contain domain column", row)
continue
logger.debug("domain='%s'", domain)
logger.debug("processed='%s'", processed)
- logger.debug("EXIT!")
+ logger.debug("Success! - EXIT!")
+ return 0
-def fetch_txt(args: argparse.Namespace):
+def fetch_txt(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
locking.acquire()
logger.debug("processed='%s'", processed)
if not processed:
- logger.debug(f"domain='{domain}' was not generically processed - SKIPPED!")
+ logger.debug("domain='%s' was not generically processed - SKIPPED!", domain)
continue
- logger.debug("EXIT!")
+ logger.debug("Success! - EXIT!")
+ return 0
-def fetch_fedipact(args: argparse.Namespace):
+def fetch_fedipact(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
locking.acquire()
logger.info("Fetching domain='%s' ...", domain)
federation.fetch_instances(domain, None, None, inspect.currentframe().f_code.co_name)
- logger.debug("EXIT!")
+ logger.debug("Success! - EXIT!")
+ return 0
logger = logging.getLogger(__name__)
def determine(domain: str, headers: dict) -> dict:
- logger.debug(f"domain='{domain}',headers()={len(headers)} - CALLED!")
+ logger.debug("domain='%s',headers()=%d - CALLED!", domain, len(headers))
domain_helper.raise_on(domain)
+
if not isinstance(headers, dict):
raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
reqheaders = headers
# Fetch / to check for meta tag indicating csrf
- logger.debug(f"Fetching / from domain='{domain}' for CSRF check ...")
+ logger.debug("Fetching / from domain='%s' for CSRF check ...", domain)
response = reqto.get(
f"https://{domain}/",
headers=network.web_headers,
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code < 300 and response.text != "" and response.text.find("<html") > 0:
# Save cookies
- logger.debug(f"Parsing response.text()={len(response.text)} Bytes ...")
+ logger.debug("Parsing response.text()=%d Bytes ...", len(response.text))
cookies.store(domain, response.cookies.get_dict())
# Parse text
response.text,
"html.parser"
)
- logger.debug(f"meta[]='{type(meta)}'")
+ logger.debug("meta[]='%s'", type(meta))
tag = meta.find("meta", attrs={"name": "csrf-token"})
- logger.debug(f"tag={tag}")
+ logger.debug("tag='%s'", tag)
if tag is not None:
- logger.debug(f"Adding CSRF token='{tag['content']}' for domain='{domain}'")
+ logger.debug("Adding CSRF token='%s' for domain='%s'", tag['content'], domain)
reqheaders["X-CSRF-Token"] = tag["content"]
- logger.debug(f"reqheaders()={len(reqheaders)} - EXIT!")
+ logger.debug("reqheaders()=%d - EXIT!", len(reqheaders))
return reqheaders
domain_helper.raise_on(domain)
if has(domain):
- logger.debug(f"Removing cookies for domain='{domain}' ...")
+ logger.debug("Removing cookies for domain='%s' ...", domain)
del _cookies[domain]
logger.debug("EXIT!")
logger.debug("CALLED!")
try:
- logger.debug(f"Acquiring lock: '{lockfile}'")
+ logger.debug("Acquiring lock: lockfile='%s'", lockfile)
LOCK = zc.lockfile.LockFile(lockfile)
logger.debug("Lock obtained.")
except zc.lockfile.LockError:
- logger.error(f"Cannot aquire lock: '{lockfile}'")
+ logger.error("Cannot aquire lock: lockfile='%s'", lockfile)
sys.exit(100)
logger.debug("EXIT!")
if LOCK is not None:
logger.debug("Releasing lock ...")
LOCK.close()
- logger.debug(f"Deleting lockfile='{lockfile}' ...")
+ logger.debug("Deleting lockfile='%s' ...", lockfile)
os.remove(lockfile)
logger.debug("EXIT!")
logger = logging.getLogger(__name__)
def reason(string: str) -> str:
- logger.debug(f"string='{string}' - CALLED!")
+ logger.debug("string='%s' - CALLED!", string)
if not isinstance(string, str):
raise ValueError(f"Parameter string[]='{type(string)}' is not 'str'")
# Strip string
string = string.strip()
- logger.debug(f"string='{string}' - EXIT!")
+ logger.debug("string='%s' - EXIT!", string)
return string
def domain(string: str) -> str:
- logger.debug(f"string='{string}' - CALLED!")
+ logger.debug("string='%s' - CALLED!", string)
if not isinstance(string, str):
raise ValueError(f"Parameter string[]='{type(string)}' is not 'str'")
# All lower-case and strip spaces out + last dot
string = string.lower().strip().rstrip(".")
- logger.debug(f"string='{string}' - #1")
+ logger.debug("string='%s' - #1", string)
# No port number
string = re.sub("\:\d+$", "", string)
- logger.debug(f"string='{string}' - #2")
+ logger.debug("string='%s' - #2", string)
# No protocol, sometimes without the slashes
string = re.sub("^https?\:(\/*)", "", string)
- logger.debug(f"string='{string}' - #3")
+ logger.debug("string='%s' - #3", string)
# No trailing slash
string = re.sub("\/$", "", string)
- logger.debug(f"string='{string}' - #4")
+ logger.debug("string='%s' - #4", string)
# No @ or : sign
string = re.sub("^\@", "", string)
string = string.split(":")[0]
- logger.debug(f"string='{string}' - #4")
+ logger.debug("string='%s' - #5", string)
# No individual users in block lists
string = re.sub("(.+)\@", "", string)
- logger.debug(f"string='{string}' - #5")
+ logger.debug("string='%s' - #6", string)
+
if string.find("/profile/"):
string = string.split("/profile/")[0]
elif string.find("/users/"):
string = string.split("/users/")[0]
- logger.debug(f"string='{string}' - EXIT!")
+ logger.debug("string='%s' - EXIT!", string)
return string
elif " - " in software:
temp = software.split(" - ")[0]
- logger.debug(f"software='{software}'")
+ logger.debug("software='%s'", software)
version = None
if " " in software:
version = temp.split(" ")[-1]
elif "-" in software:
version = temp.split("-")[-1]
else:
- logger.debug(f"Was not able to find common seperator, returning untouched software='{software}'")
+ logger.debug("Was not able to find common seperator, returning untouched software='%s' - EXIT!", software)
return software
match = None
- logger.debug(f"Checking {len(patterns)} patterns ...")
+ logger.debug("Checking %d patterns ...", len(patterns))
for pattern in patterns:
# Run match()
match = pattern.match(version)
- logger.debug(f"match[]='{type(match)}'")
+ logger.debug("match[]='%s'", type(match))
if isinstance(match, re.Match):
- logger.debug(f"version='{version}' is matching pattern='{pattern}'")
+ logger.debug("version='%s' is matching pattern='%s'", version, pattern)
break
- logger.debug(f"version[{type(version)}]='{version}',match='{match}'")
+ logger.debug("version[%s]='%s',match='%s'", type(version), version, match)
if not isinstance(match, re.Match):
logger.warning("version='%s' does not match regex, leaving software='%s' untouched.", version, software)
return software
- logger.debug(f"Found valid version number: '{version}', removing it ...")
+ logger.debug("Found valid version number: '%s', removing it ...", version)
end = len(temp) - len(version) - 1
- logger.debug(f"end[{type(end)}]={end}")
+ logger.debug("end[%s]='%s'", type(end), end)
software = temp[0:end].strip()
if " version" in software:
- logger.debug(f"software='{software}' contains word ' version'")
+ logger.debug("software='%s' contains word ' version'", software)
software = strip_until(software, " version")
logger.debug("software='%s' - EXIT!", software)
return software
start = software.find("powered by ")
- logger.debug(f"start[{type(start)}]='{start}'")
+ logger.debug("start[%s]='%d'", type(start), start)
software = software[start + 11:].strip()
- logger.debug(f"software='{software}'")
+ logger.debug("software='%s'", software)
software = strip_until(software, " - ")
return software
end = software.find("hosted on ")
- logger.debug(f"end[{type(end)}]='{end}'")
+ logger.debug("end[%s]='%d'", type(end), end)
software = software[0:end].strip()
logger.debug("software[%s]='%s'", type(software), software)
return software
def strip_until(software: str, until: str) -> str:
- logger.debug(f"software='{software}',until='{until}' - CALLED!")
+ logger.debug("software='%s',until='%s' - CALLED!", software, until)
if not isinstance(software, str):
raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
elif software == "":
# Next, strip until part
end = software.find(until)
- logger.debug(f"end[{type(end)}]='{end}'")
+ logger.debug("end[%s]='%d'", type(end), end)
if end > 0:
software = software[0:end].strip()
]
def fetch_instances(domain: str, origin: str, software: str, command: str, path: str = None):
- logger.debug(f"domain='{domain}',origin='{origin}',software='{software}',path='{path}' - CALLED!")
+ logger.debug("domain='%s',origin='%s',software='%s',path='%s' - CALLED!", domain, origin, software, path)
domain_helper.raise_on(domain)
if not isinstance(origin, str) and origin is not None:
raise ValueError(f"Parameter origin[]='{type(origin)}' is not 'str'")
elif software is None:
- logger.debug(f"Updating last_instance_fetch for domain='{domain}' ...")
+ logger.debug("Updating last_instance_fetch for domain='%s' ...", domain)
instances.set_last_instance_fetch(domain)
- logger.debug(f"software for domain='{domain}' is not set, determining ...")
- software = None
try:
+ logger.debug("Software for domain='%s' is not set, determining ...", domain)
software = determine_software(domain, path)
except network.exceptions as exception:
logger.warning("Exception '%s' during determining software type", type(exception))
instances.set_last_error(domain, exception)
- logger.debug(f"Determined software='{software}' for domain='{domain}'")
+ logger.debug("Determined software='%s' for domain='%s'", software, domain)
elif not isinstance(software, str):
raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
elif not isinstance(command, str):
raise ValueError("Parameter 'command' is empty")
if not instances.is_registered(domain):
- logger.debug(f"Adding new domain='{domain}',origin='{origin}',command='{command}',path='{path}',software='{software}'")
+ logger.debug("Adding new domain='%s',origin='%s',command='%s',path='%s',software='%s'", domain, origin, command, path, software)
instances.add(domain, origin, command, path, software)
- logger.debug(f"Updating last_instance_fetch for domain='{domain}' ...")
+ logger.debug("Updating last_instance_fetch for domain='%s' ...", domain)
instances.set_last_instance_fetch(domain)
logger.debug("Fetching instances for domain='%s',software='%s'", domain, software)
logger.warning("Cannot fetch peers: domain='%s'", domain)
return
elif instances.has_pending(domain):
- logger.debug(f"domain='{domain}' has pending nodeinfo data, flushing ...")
+ logger.debug("domain='%s' has pending nodeinfo data, flushing ...", domain)
instances.update_data(domain)
logger.info("Checking %d instances from domain='%s' ...", len(peerlist), domain)
for instance in peerlist:
- logger.debug(f"instance='{instance}'")
+ logger.debug("instance='%s'", instance)
if instance is None:
# Skip "None" types as tidup.domain() cannot parse them
continue
- logger.debug(f"instance='{instance}' - BEFORE")
+ logger.debug("instance='%s' - BEFORE", instance)
instance = tidyup.domain(instance)
- logger.debug(f"instance='{instance}' - AFTER")
+ logger.debug("instance='%s' - AFTER", instance)
if instance == "":
logger.warning("Empty instance after tidyup.domain(), domain='%s'", domain)
logger.debug("EXIT!")
def fetch_peers(domain: str, software: str) -> list:
- logger.debug(f"domain({len(domain)})='{domain}',software='{software}' - CALLED!")
+ logger.debug("domain='%s',software='%s' - CALLED!", domain, software)
domain_helper.raise_on(domain)
if not isinstance(software, str) and software is not None:
raise ValueError(f"software[]='{type(software)}' is not 'str'")
if software == "misskey":
- logger.debug(f"Invoking misskey.fetch_peers({domain}) ...")
+ logger.debug("Invoking misskey.fetch_peers(%s) ...", domain)
return misskey.fetch_peers(domain)
elif software == "lemmy":
- logger.debug(f"Invoking lemmy.fetch_peers({domain}) ...")
+ logger.debug("Invoking lemmy.fetch_peers(%s) ...", domain)
return lemmy.fetch_peers(domain)
elif software == "peertube":
- logger.debug(f"Invoking peertube.fetch_peers({domain}) ...")
+ logger.debug("Invoking peertube.fetch_peers(%s) ...", domain)
return peertube.fetch_peers(domain)
# Init peers variable
instances.set_last_error(domain, exception)
return peers
- logger.debug(f"Fetching peers from '{domain}',software='{software}' ...")
+ logger.debug("Fetching peers from domain='%s',software='%s' ...", domain, software)
data = network.get_json_api(
domain,
"/api/v1/instance/peers",
logger.debug("Fetching nodeinfo from domain='%s' ...", domain)
nodeinfo = fetch_wellknown_nodeinfo(domain)
- logger.debug("nodeinfo[%s]({len(nodeinfo)}='%s'", type(nodeinfo), nodeinfo)
+ logger.debug("nodeinfo[%s](%d='%s'", type(nodeinfo), len(nodeinfo), nodeinfo)
if "error_message" not in nodeinfo and "json" in nodeinfo and len(nodeinfo["json"]) > 0:
logger.debug("Found nodeinfo[json]()=%d - EXIT!", len(nodeinfo['json']))
return nodeinfo["json"]
logger.debug("Corrected empty string to None for software of domain='%s'", domain)
software = None
elif isinstance(software, str) and ("." in software or " " in software):
- logger.debug("software='%s' may contain a version number, domain='{domain}', removing it ...", software)
+ logger.debug("software='%s' may contain a version number, domain='%s', removing it ...", software, domain)
software = version.remove(software)
logger.debug("software[]='%s'", type(software))
logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
if not is_registered(domain):
- logger.debug(f"domain='{domain}' is not registered, returning False - EXIT!")
+ logger.debug("domain='%s' is not registered, returning False - EXIT!", domain)
return False
# Query database
logger.debug("EXIT!")
def set_total_peers(domain: str, peers: list):
- logger.debug(f"domain='{domain}',peers()={len(peers)} - CALLED!")
+ logger.debug("domain='%s',peers()=%d - CALLED!", domain, len(peers))
domain_helper.raise_on(domain)
+
if not isinstance(peers, list):
raise ValueError(f"Parameter peers[]='{type(peers)}' is not 'list'")
logger.debug("EXIT!")
def set_nodeinfo_url(domain: str, url: str):
- logger.debug(f"domain='{domain}',url='{url}' - CALLED!")
+ logger.debug("domain='%s',url='%s' - CALLED!", domain, url)
domain_helper.raise_on(domain)
if not isinstance(url, str):
logger.debug("EXIT!")
def set_detection_mode(domain: str, mode: str):
- logger.debug(f"domain='{domain}',mode='{mode}' - CALLED!")
+ logger.debug("domain='%s',mode='%s' - CALLED!", domain, mode)
domain_helper.raise_on(domain)
if not isinstance(mode, str):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
- logger.debug(f"Appending blocked='{blocked}',reason='{reason}'")
+ logger.debug("Appending blocked='%s',reason='%s'", blocked, reason)
blocklist.append({
"blocker" : domain,
"blocked" : tidyup.domain(blocked),
doc = None
for path in ["/about/more", "/about"]:
try:
- logger.debug(f"Fetching path='{path}' from domain='{domain}' ...")
+ logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
doc = bs4.BeautifulSoup(
network.fetch_response(
domain,
)
if len(doc.find_all("h3")) > 0:
- logger.debug(f"path='{path}' had some headlines - BREAK!")
+ logger.debug("path='%s' had some headlines - BREAK!", path)
break
except network.exceptions as exception:
# Check type
logger.debug("block[]='%s'", type(block))
if not isinstance(block, dict):
- logger.debug(f"block[]='{type(block)}' is of type 'dict' - SKIPPED!")
+ logger.debug("block[]='%s' is of type 'dict' - SKIPPED!", type(block))
continue
reason = tidyup.reason(block["comment"]) if "comment" in block and block['comment'] is not None and block['comment'] != "" else None
already = 0
logger.debug("rows(%d))[]='%s'", len(rows), type(rows))
for row in rows:
- logger.debug(f"row()={len(row)}")
+ logger.debug("row()=%d", len(row))
if "host" not in row:
logger.warning("row()=%d does not contain key 'host': row='%s',domain='%s' - SKIPPED!", len(row), row, domain)
continue
logger.warning("row[host][]='%s' is not 'str' - SKIPPED!", type(row['host']))
continue
elif not utils.is_domain_wanted(row["host"]):
- logger.debug("row[host]='%s' is not wanted, domain='{domain}' - SKIPPED!", row['host'])
+ logger.debug("row[host]='%s' is not wanted, domain='%s' - SKIPPED!", row['host'], domain)
continue
elif row["host"] in peers:
logger.debug("Not adding row[host]='%s', already found - SKIPPED!", row['host'])
already = already + 1
continue
- logger.debug("Adding peer: '%s'", row['host'])
+ logger.debug("Adding peer: row[host]='%s'", row['host'])
peers.append(row["host"])
if already == len(rows):
logger.debug("Returned zero bytes, domain='%s' - BREAK!", domain)
break
elif len(rows) != config.get("misskey_limit"):
- logger.debug("Fetched %d row(s) but expected: '{config.get('misskey_limit')}'", len(rows))
+ logger.debug("Fetched %d row(s) but expected: %d'", len(rows), config.get('misskey_limit'))
offset = offset + (config.get("misskey_limit") - len(rows))
else:
logger.debug("Raising offset by step='%d'", step)
logger = logging.getLogger(__name__)
def fetch_peers(domain: str) -> list:
- logger.debug(f"domain({len(domain)})='{domain}',software='peertube' - CALLED!")
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
- logger.debug(f"domain='{domain}' is a PeerTube, fetching JSON ...")
+ logger.debug("domain='%s' is a PeerTube, fetching JSON ...", domain)
peers = list()
start = 0
return list()
for mode in ["followers", "following"]:
- logger.debug(f"domain='{domain}',mode='{mode}'")
+ logger.debug("domain='%s',mode='%s'", domain, mode)
while True:
data = network.get_json_api(
domain,
logger.debug("data[]='%s'", type(data))
if "error_message" not in data:
- logger.debug(f"Success, data[json]()={len(data['json'])}")
+ logger.debug("Success, data[json]()=%d", len(data['json']))
if "data" in data["json"]:
rows = data["json"]["data"]
- logger.debug(f"Found {len(rows)} record(s).")
+ logger.debug("Found %d record(s).", len(rows))
for record in rows:
- logger.debug(f"record()={len(record)}")
+ logger.debug("record()=%d", len(record))
for mode2 in ["follower", "following" ]:
- logger.debug(f"mode2='{mode2}'")
+ logger.debug("mode2='%s'", mode2)
if mode2 in record and "host" in record[mode2]:
- logger.debug(f"Found host='{record[mode2]['host']}', adding ...")
+ logger.debug("Found mode2='%s',host='%s', adding ...", mode2, record[mode2]['host'])
peers.append(record[mode2]["host"])
else:
logger.warning("Record from domain='%s' has no mode2='%s' or 'host' record[]='%s", domain, mode2, type(record))
blockdict = list()
rows = None
try:
- logger.debug(f"Fetching nodeinfo: domain='{domain}',nodeinfo_url='{nodeinfo_url}'")
+ logger.debug("Fetching nodeinfo: domain='%s',nodeinfo_url='%s'", domain, nodeinfo_url)
rows = federation.fetch_nodeinfo(domain, nodeinfo_url)
except network.exceptions as exception:
logger.warning("Exception '%s' during fetching nodeinfo from domain='%s'", type(exception), domain)