logger.debug("fetched[]='%s'", type(fetched))
if "error_message" in fetched:
- logger.warning(f"post_json_api() for 'gql.api.bka.li' returned error message: {fetched['error_message']}")
+ logger.warning("post_json_api() for 'gql.api.bka.li' returned error message='%s", fetched['error_message'])
return 100
elif isinstance(fetched["json"], dict) and "error" in fetched["json"] and "message" in fetched["json"]["error"]:
- logger.warning(f"post_json_api() returned error: {fetched['error']['message']}")
+ logger.warning("post_json_api() returned error: '%s", fetched['error']['message'])
return 101
rows = fetched["json"]
logger.debug("Invoking cookies.clear(%s) ...", domain)
cookies.clear(domain)
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_bkali) from domain='{domain}'")
+ logger.warning("Exception '%s' during fetching instances (fetch_bkali) from domain='%s'", type(exception), domain)
instances.set_last_error(domain, exception)
logger.debug("Success - EXIT!")
if args.domain is not None and args.domain != "":
logger.debug(f"args.domain='{args.domain}' - checking ...")
if not validators.domain(args.domain):
- logger.warning(f"domain='{args.domain}' is not valid.")
+ logger.warning("args.domain='%s' is not valid.", args.domain)
return
elif blacklist.is_blacklisted(args.domain):
- logger.warning(f"domain='{args.domain}' is blacklisted, won't check it!")
+ logger.warning("args.domain='%s' is blacklisted, won't check it!", args.domain)
return
elif not instances.is_registered(args.domain):
- logger.warning(f"domain='{args.domain}' is not registered, please run ./utils.py fetch_instances {args.domain} first.")
+ logger.warning("args.domain='%s' is not registered, please run ./utils.py fetch_instances '%s' first.", args.domain, args.domain)
return
locking.acquire()
elif nodeinfo_url is None or nodeinfo_url == "":
logger.debug(f"blocker='{blocker}',software='{software}' has empty nodeinfo_url")
continue
- elif blacklist.is_blacklisted(blocker):
- logger.warning(f"blocker='{blocker}' is blacklisted now!")
+ elif not utils.is_domain_wanted(blocker):
+ logger.warning("blocker='%s' is not wanted - SKIPPED!", blocker)
continue
logger.debug(f"blocker='{blocker}'")
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocker='{blocker}',software='{software}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',blocker='%s',software='%s' - SKIPPED!", blocked, blocker, software)
continue
blocked = row[0]
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocker='{blocker}',software='{software}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',blocker='%s',software='%s' - SKIPPED!", blocked, blocker, software)
continue
blocked = row[0]
try:
logger.debug(f"Fetching table data for software='{software}' ...")
raw = utils.fetch_url(f"https://fediverse.observer/app/views/tabledata.php?software={software}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
- logger.debug(f"raw[{type(raw)}]()={len(raw)}")
+ logger.debug("raw[%s]()=%d", type(raw), len(raw))
doc = bs4.BeautifulSoup(raw, features='html.parser')
logger.debug("doc[]='%s'", type(doc))
except network.exceptions as exception:
- logger.warning(f"Cannot fetch software='{software}' from fediverse.observer: '{type(exception)}'")
+ logger.warning("Cannot fetch software='%s' from fediverse.observer: '%s'", software, type(exception))
continue
items = doc.findAll("a", {"class": "url"})
if not instances.is_registered(blocked):
try:
- logger.info("Fetching instances from domain='%s' ...", row['domain'])
+ logger.info("Fetching instances from domain='%s' ...", blocked)
federation.fetch_instances(blocked, 'chaos.social', None, inspect.currentframe().f_code.co_name)
- logger.debug("Invoking cookies.clear(%s) ...", row['domain'])
+ logger.debug("Invoking cookies.clear(%s) ...", blocked)
cookies.clear(blocked)
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_cs) from domain='{row['domain']}'")
+ logger.warning("Exception '%s' during fetching instances (fetch_cs) from blocked='%s'", type(exception), blocked)
instances.set_last_error(blocked, exception)
if blocks.is_instance_blocked("todon.eu", blocked, block_level):
logger.debug(f"row='{row}'")
if not instances.is_registered(row["domain"]):
try:
- logger.info("Fetching instances from domain='%s' ...", row['domain'])
+ logger.info("Fetching instances from domain='%s' ...", row["domain"])
federation.fetch_instances(row["domain"], 'chaos.social', None, inspect.currentframe().f_code.co_name)
- logger.debug("Invoking cookies.clear(%s) ...", row['domain'])
+ logger.debug("Invoking cookies.clear(%s) ...", row["domain"])
cookies.clear(row["domain"])
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_cs) from domain='{row['domain']}'")
+ logger.warning("Exception '%s' during fetching instances (fetch_cs) from row[domain]='%s'", type(exception), row["domain"])
instances.set_last_error(row["domain"], exception)
if not blocks.is_instance_blocked('chaos.social', row["domain"], block_level):
- logger.debug(f"domain='{row['domain']}',block_level='{block_level}' blocked by chaos.social, adding ...")
+ logger.debug("domain='%s',block_level='%s' blocked by chaos.social, adding ...", row["domain"], block_level)
blocks.add_instance('chaos.social', row["domain"], row["reason"], block_level)
logger.debug("Invoking commit() ...")
logger.debug("Invoking cookies.clear(%s) ...", domain)
cookies.clear(domain)
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_fba_rss) from domain='{domain}'")
+ logger.warning("Exception '%s' during fetching instances (fetch_fba_rss) from domain='%s'", type(exception), domain)
instances.set_last_error(domain, exception)
logger.debug("EXIT!")
logger.debug("Invoking cookies.clear(%s) ...", domain)
cookies.clear(domain)
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_fbabot_atom) from domain='{domain}'")
+ logger.warning("Exception '%s' during fetching instances (fetch_fbabot_atom) from domain='%s'", type(exception), domain)
instances.set_last_error(domain, exception)
logger.debug("EXIT!")
logger.debug(f"Invoking cookies.clear({args.domain}) ...")
cookies.clear(args.domain)
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_instances) from args.domain='{args.domain}'")
+ logger.warning("Exception '%s' during fetching instances (fetch_instances) from args.domain='%s'", type(exception), args.domain)
instances.set_last_error(args.domain, exception)
return 100
logger.debug(f"Invoking cookies.clear({row[0]}) ...")
cookies.clear(row[0])
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_instances) from domain='{row[0]}'")
+ logger.warning("Exception '%s' during fetching instances (fetch_instances) from row[0]='%s'", type(exception), row[0])
instances.set_last_error(row[0], exception)
logger.debug("Success - EXIT!")
logger.debug(f"Setting key='{key}',sub[{type(sub)}]='{sub}',value[]='{type(value)}'")
_cache[key][sub[0]] = value
else:
- logger.warning(f"Unsupported type sub[]='{type(sub)}'")
+ logger.warning("Unsupported type sub[]='%s'", type(sub))
logger.debug("EXIT!")
]
def remove(software: str) -> str:
- logger.debug(f"software='{software}' - CALLED!")
+ logger.debug("software='%s' - CALLED!", software)
if "." not in software and " " not in software:
- logger.warning(f"software='{software}' does not contain a version number.")
+ logger.warning("software='%s' does not contain a version number.", software)
return software
temp = software
logger.debug(f"version[{type(version)}]='{version}',match='{match}'")
if not isinstance(match, re.Match):
- logger.warning(f"version='{version}' does not match regex, leaving software='{software}' untouched.")
+ logger.warning("version='%s' does not match regex, leaving software='%s' untouched.", version, software)
return software
logger.debug(f"Found valid version number: '{version}', removing it ...")
logger.debug(f"software='{software}' contains word ' version'")
software = strip_until(software, " version")
- logger.debug(f"software='{software}' - EXIT!")
+ logger.debug("software='%s' - EXIT!", software)
return software
def strip_powered_by(software: str) -> str:
- logger.debug(f"software='{software}' - CALLED!")
+ logger.debug("software='%s' - CALLED!", software)
if not isinstance(software, str):
raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
elif software == "":
raise ValueError("Parameter 'software' is empty")
elif "powered by" not in software:
- logger.warning(f"Cannot find 'powered by' in software='{software}'!")
+ logger.warning("Cannot find 'powered by' in software='%s'!", software)
return software
start = software.find("powered by ")
software = strip_until(software, " - ")
- logger.debug(f"software='{software}' - EXIT!")
+ logger.debug("software='%s' - EXIT!", software)
return software
def strip_hosted_on(software: str) -> str:
- logger.debug(f"software='{software}' - CALLED!")
+ logger.debug("software='%s' - CALLED!", software)
if not isinstance(software, str):
raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
elif software == "":
raise ValueError("Parameter 'software' is empty")
elif "hosted on" not in software:
- logger.warning(f"Cannot find 'hosted on' in '{software}'!")
+ logger.warning("Cannot find 'hosted on' in sofware='%s'!", software)
return software
end = software.find("hosted on ")
software = strip_until(software, " - ")
- logger.debug(f"software='{software}' - EXIT!")
+ logger.debug("software='%s' - EXIT!", software)
return software
def strip_until(software: str, until: str) -> str:
elif until == "":
raise ValueError("Parameter 'until' is empty")
elif not until in software:
- logger.warning(f"Cannot find '{until}' in '{software}'!")
+ logger.warning("Cannot find until='%s' in software='%s'!", until, software)
return software
# Next, strip until part
if end > 0:
software = software[0:end].strip()
- logger.debug(f"software='{software}' - EXIT!")
+ logger.debug("software='%s' - EXIT!", software)
return software
from fba import csrf
from fba import utils
-from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import tidyup
from fba.helpers import version
try:
software = determine_software(domain, path)
except network.exceptions as exception:
- logger.debug(f"Exception '{type(exception)}' during determining software type")
- pass
+ logger.warning("Exception '%s' during determining software type", type(exception))
+ instances.set_last_error(domain, exception)
logger.debug(f"Determined software='{software}' for domain='{domain}'")
elif not isinstance(software, str):
return data
def fetch_generator_from_path(domain: str, path: str = "/") -> str:
- logger.debug(f"domain({len(domain)})='{domain}',path='{path}' - CALLED!")
+ logger.debug("domain(%d)='%s',path='%s' - CALLED!", len(domain), domain, path)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
logger.debug(f"software='{software}' has ' see ' in it")
software = version.strip_until(software, " see ")
- logger.debug(f"software='{software}' - EXIT!")
+ logger.debug("software='%s' - EXIT!", software)
return software
def determine_software(domain: str, path: str = None) -> str:
- logger.debug(f"domain({len(domain)})='{domain}',path='{path}' - CALLED!")
+ logger.debug("domain(%d)='%s',path='%s' - CALLED!", len(domain), domain, path)
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
logger.debug("Returning None - EXIT!")
return None
- sofware = tidyup.domain(software)
+ software = tidyup.domain(software)
logger.debug("sofware after tidyup.domain():", software)
if software in ["akkoma", "rebased", "akkounfucked", "ched"]:
logger.debug("domain,origin,command,path:", domain, origin, command, path)
software = federation.determine_software(domain, path)
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during determining software type")
+ logger.warning("Exception '%s' during determining software type, domain='%s'", type(exception), domain)
set_last_error(domain, exception)
logger.debug("Determined software:", software)
if software == "lemmy" and domain.find("/c/") > 0:
domain = domain.split("/c/")[0]
if is_registered(domain):
- logger.warning(f"domain='{domain}' already registered after cutting off user part. - EXIT!")
+ logger.warning("domain='%s' already registered after cutting off user part. - EXIT!", domain)
return
logger.info("Adding instance domain='%s' (origin='%s',software='%s')", domain, origin, software)
from fba import utils
-from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import tidyup
from fba import database
from fba import utils
-from fba.helpers import blacklist
from fba.helpers import config
from fba.helpers import tidyup
try:
# json endpoint for newer mastodongs
found_blocks = list()
- blocklist = list()
-
- rows = {
- "reject" : [],
- "media_removal" : [],
- "followers_only": [],
- "report_removal": [],
- }
logger.debug(f"Fetching /instances from domain='{domain}'")
response = network.fetch_response(
logger.debug(f"row[{type(row)}]='{row}'")
if row is None:
- logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
+ logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
return False
logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
logger.debug(f"row[{type(row)}]='{row}'")
if row is None:
- logger.warning(f"Cannot de-obfucate domain='{domain}' - SKIPPED!")
+ logger.warning("Cannot de-obfucate domain='%s' - SKIPPED!", domain)
return False
logger.debug(f"domain='{domain}' de-obscured to '{row[0]}'")
logger.debug("Invoking cookies.clear(%s) ...", domain)
cookies.clear(domain)
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
+ logger.warning("Exception '%s' during fetching instances (fetch_oliphant) from domain='%s'", type(exception), domain)
instances.set_last_error(domain, exception)
logger.debug(f"processed='{processed}' - EXIT!")