"misskeytest.chn.moe",
# block flooder
"everyoneattack.com",
+ # responds with very long reply
+ "grossard.fr",
]
def is_blacklisted(domain: str) -> bool:
logger.debug(f"instance='{instance}' - AFTER")
if instance == "":
- logger.warning(f"Empty instance after tidyup.domain(), domain='{domain}'")
+ logger.warning("Empty instance after tidyup.domain(), domain='%s'", domain)
continue
elif not utils.is_domain_wanted(instance):
logger.debug("instance='%s' is not wanted - SKIPPED!", instance)
logger.debug("instance='%s' is a link to a single user profile - SKIPPED!", instance)
continue
elif not instances.is_registered(instance):
- logger.debug("Adding new instance:", instance, domain)
+ logger.debug("Adding new instance='%s',domain='%s',command='%s'", instance, domain, command)
instances.add(instance, domain, command)
logger.debug("EXIT!")
logger.debug("Checking CSRF for domain='%s'", domain)
headers = csrf.determine(domain, dict())
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during checking CSRF (fetch_peers,{__name__}) - EXIT!")
+ logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s) - EXIT!", type(exception), __name__)
instances.set_last_error(domain, exception)
return peers
logger.debug("data[]='%s'", type(data))
if "error_message" in data:
- logger.warning(f"Could not reach any JSON API at domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'")
+ logger.warning("Could not reach any JSON API at domain='%s',status_code='%d',error_message='%s'", domain, data['status_code'], data['error_message'])
elif "federated_instances" in data["json"]:
- logger.debug(f"Found federated_instances for domain='{domain}'")
+ logger.debug("Found federated_instances for domain='%s'", domain)
peers = peers + add_peers(data["json"]["federated_instances"])
logger.debug("Added instance(s) to peers")
else:
logger.debug("software='%s' has 'powered by' in it", software)
software = version.remove(version.strip_powered_by(software))
- logger.debug("Returning domain='%s',software='%s'", domain, software)
+ logger.debug("software='%s' - EXIT!", domain, software)
return software
def find_domains(tag: bs4.element.Tag) -> list:
urllib3.exceptions.LocationParseError
)
-def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) -> dict:
- logger.debug(f"domain='{domain}',path='{path}',data='{data}',headers()={len(headers)} - CALLED!")
+def post_json_api(domain: str, path: str, data: str = "", headers: dict = dict()) -> dict:
+ logger.debug("domain='%s',path='%s',data='%s',headers()=%d - CALLED!", domain, path, data, len(headers))
domain_helper.raise_on(domain)
if not isinstance(path, str):
json_reply["json"] = json_from_response(response)
- logger.debug(f"response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
+ logger.debug("response.ok='%s',response.status_code='%d',json_reply[]='%s'", response.ok, response.status_code, type(json_reply))
if not response.ok or response.status_code >= 400:
- logger.warning(f"Cannot query JSON API: domain='{domain}',path='{path}',data()={len(data)},response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
+ logger.warning("Cannot query JSON API: domain='%s',path='%s',data()=%d,response.status_code='%d',json_reply[]='%s'", domain, path, len(data), response.status_code, type(json_reply))
json_reply["status_code"] = response.status_code
json_reply["error_message"] = response.reason
del json_reply["json"]
instances.set_last_error(domain, response)
except exceptions as exception:
- logger.debug(f"Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
+ logger.debug("Fetching path='%s' from domain='%s' failed. exception[%s]='%s'", path, domain, type(exception), str(exception))
json_reply["status_code"] = 999
json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
json_reply["exception"] = exception
instances.set_last_error(domain, exception)
raise exception
- logger.debug(f"Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
+ logger.debug("Returning json_reply(%d)[]='%s' - EXIT!", len(json_reply), type(json_reply))
return json_reply
def fetch_api_url(url: str, timeout: tuple) -> dict:
- logger.debug(f"url='{url}',timeout()={len(timeout)} - CALLED!")
+ logger.debug("url='%s',timeout()=%d - CALLED!", url, len(timeout))
if not isinstance(url, str):
raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
+ elif url == "":
+ raise ValueError("Parameter 'url' is empty")
elif not isinstance(timeout, tuple):
raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'")
json_reply["json"] = json_from_response(response)
- logger.debug(f"response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
+ logger.debug("response.ok='%s',response.status_code='%s',json_reply[]='%s'", response.ok, response.status_code, type(json_reply))
if not response.ok or response.status_code >= 400:
- logger.warning(f"Cannot query JSON API: url='{url}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
+ logger.warning("Cannot query JSON API: url='%s',response.status_code='%d',json_reply[]='%s'", url, response.status_code, type(json_reply))
json_reply["status_code"] = response.status_code
json_reply["error_message"] = response.reason
del json_reply["json"]
except exceptions as exception:
- logger.debug(f"Fetching '{url}' failed. exception[{type(exception)}]='{str(exception)}'")
+ logger.debug("Fetching url='%s' failed. exception[%s]='%s'", url, type(exception), str(exception))
json_reply["status_code"] = 999
json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
json_reply["exception"] = exception
raise exception
- logger.debug(f"Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
+ logger.debug("Returning json_reply(%d)[]='%s' - EXIT!", len(json_reply), type(json_reply))
return json_reply
def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict:
- logger.debug(f"domain='{domain}',path='{path}',timeout()={len(timeout)} - CALLED!")
+ logger.debug("domain='%s',path='%s',timeout()=%d - CALLED!", domain, path, len(timeout))
domain_helper.raise_on(domain)
if not isinstance(path, str):
json_reply["json"] = json_from_response(response)
- logger.debug(f"response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
+ logger.debug("response.ok='%s',response.status_code='%d',json_reply[]='%s'", response.ok, response.status_code, type(json_reply))
if not response.ok or response.status_code >= 400:
- logger.warning(f"Cannot query JSON API: domain='{domain}',path='{path}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
+ logger.warning("Cannot query JSON API: domain='%s',path='%s',response.status_code='%d',json_reply[]='%s'", domain, path, response.status_code, type(json_reply))
json_reply["status_code"] = response.status_code
json_reply["error_message"] = response.reason
del json_reply["json"]
instances.set_last_error(domain, response)
- logger.debug(f"Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
+ logger.debug("Returning json_reply(%d)[]='%s' - EXIT!", len(json_reply), type(json_reply))
return json_reply
def send_bot_post(domain: str, blocklist: dict):
- logger.debug(f"domain='{domain}',blocklist()={len(blocklist)} - CALLED!")
+ logger.debug("domain='%s',blocklist()=%d - CALLED!", domain, len(blocklist))
domain_helper.raise_on(domain)
if not isinstance(blocklist, dict):
truncated = True
blocklist = blocklist[0 : 19]
- logger.debug(f"blocklist()={len(blocklist)}")
+ logger.debug("blocklist()=%d", len(blocklist))
for block in blocklist:
- logger.debug(f"block['{type(block)}']={block}")
+ logger.debug("block[%s]=%s", type(block), block)
if block["reason"] is None or block["reason"] == '':
message = message + block["blocked"] + " with unspecified reason\n"
else:
return True
def fetch_response(domain: str, path: str, headers: dict, timeout: tuple) -> requests.models.Response:
- logger.debug(f"domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
+ logger.debug("domain='%s',path='%s',headers()=%d,timeout='%s' - CALLED!", domain, path, len(headers), timeout)
domain_helper.raise_on(domain)
if not isinstance(path, str):
raise ValueError(f"timeout[]='{type(timeout)}' is not 'tuple'")
try:
- logger.debug(f"Sending GET request to '{domain}{path}' ...")
+ logger.debug("Sending GET request to '%s%s' ...", domain, path)
response = reqto.get(
f"https://{domain}{path}",
headers=headers,
)
except exceptions as exception:
- logger.debug(f"Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
+ logger.debug("Fetching path='%s' from domain='%s' failed. exception[%s]='%s'", path, domain, type(exception), str(exception))
instances.set_last_error(domain, exception)
raise exception
- logger.debug(f"response[]='{type(response)}' - EXXIT!")
+ logger.debug("response[]='%s' - EXIT!", type(response))
return response
def json_from_response(response: requests.models.Response) -> list:
- logger.debug(f"response[]='{type(response)}' - CALLED!")
+ logger.debug("response[]='%s' - CALLED!", type(response))
if not isinstance(response, requests.models.Response):
raise ValueError(f"Parameter response[]='{type(response)}' is not type of 'Response'")
data = list()
if response.text.strip() != "":
- logger.debug(f"response.text()={len(response.text)} is not empty, invoking response.json() ...")
+ logger.debug("response.text()=%d is not empty, invoking response.json() ...", len(response.text))
try:
data = response.json()
except json.decoder.JSONDecodeError:
pass
- logger.debug(f"data[]='{type(data)}' - EXIT!")
+ logger.debug("data[]='%s' - EXIT!", type(data))
return data
def deobscure(char: str, domain: str, blocked_hash: str = None) -> tuple:
logger.debug("char='%s',domain='%s',blocked_hash='%s' - CALLED!", char, domain, blocked_hash)
- domain_helper.raise_on(domain)
- if not isinstance(char, str):
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif domain.lower() != domain:
+ raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
+ elif domain.endswith(".arpa"):
+ raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
+ elif domain.endswith(".tld"):
+ raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
+ elif not isinstance(char, str):
raise ValueError(f"Parameter char[]='{type(char)}' is not 'str'")
elif char == "":
raise ValueError("Parameter 'char' is empty")
if isinstance(blocked_hash, str):
logger.debug("Looking up blocked_hash='%s',domain='%s' ...", blocked_hash, domain)
database.cursor.execute(
- "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? OR domain LIKE ? LIMIT 1", [blocked_hash, domain.replace("_")]
+ "SELECT domain, origin, nodeinfo_url FROM instances WHERE hash = ? OR domain LIKE ? LIMIT 1", [blocked_hash, domain.replace(char, "_")]
)
row = database.cursor.fetchone()
})
logger.debug("Next!")
- logger.debug("Returning blocklist() for domain:", domain, len(blocklist))
+ logger.debug("Returning blocklist()=%d for domain='%s' - EXIT!", len(blocklist), domain)
return {
"reject": blocklist
}
break
except network.exceptions as exception:
- logger.warning(f"Cannot fetch from domain='{domain}',exception='{type(exception)}'")
+ logger.warning("Cannot fetch from domain='%s',exception='%s'", domain, type(exception))
instances.set_last_error(domain, exception)
break
logger.debug("doc[]='%s'", type(doc))
if doc is None:
- logger.warning(f"Cannot fetch any /about pages for domain='{domain}' - EXIT!")
+ logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
return blocklist
for header in doc.find_all("h3"):
logger.debug("header_text='%s'", header_text)
if header_text in language_mapping:
- logger.debug("header_text='%s'", header_text)
+ logger.debug("Translating header_text='%s' ...", header_text)
header_text = language_mapping[header_text]
else:
- logger.warning(f"header_text='{header_text}' not found in language mapping table")
+ logger.warning("header_text='%s' not found in language mapping table", header_text)
if header_text in blocklist or header_text.lower() in blocklist:
# replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
"reason": tidyup.reason(line.find_all("td")[1].text),
})
else:
- logger.warning(f"header_text='{header_text}' not found in blocklist()={len(blocklist)}")
+ logger.warning("header_text='%s' not found in blocklist()=%d", header_text, len(blocklist))
- logger.debug("Returning blocklist for domain:", domain)
+ logger.debug("Returning blocklist for domain='%s' - EXIT!", domain)
return {
"reject" : blocklist["Suspended servers"],
"media_removal" : blocklist["Filtered media"],
logger.debug("Checking CSRF for domain='%s'", domain)
headers = csrf.determine(domain, dict())
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during checking CSRF (fetch_blocks,{__name__}) - EXIT!")
+ logger.warning("Exception '%s' during checking CSRF (fetch_blocks,%s) - EXIT!", type(exception), __name__)
instances.set_last_error(domain, exception)
return
"report_removal": [],
}
- logger.debug("Querying API domain_blocks:", domain)
+ logger.debug("Querying API domain_blocks: domain='%s'", domain)
data = network.get_json_api(
domain,
"/api/v1/instance/domain_blocks",
instances.set_last_error(domain, data)
return
elif "json" in data and "error" in data["json"]:
- logger.warning(f"JSON API returned error message: '{data['json']['error']}'")
+ logger.warning("JSON API returned error message: '%s'", data['json']['error'])
instances.set_last_error(domain, data)
return
else:
logger.debug("domain='%s' skipping block_level='accept'", domain)
continue
- logger.debug(f"Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
+ logger.debug("Checking %s entries from domain='{domain}',block_level='{block_level}' ...", len(blocklist))
for block in blocklist:
logger.debug("block[]='%s'", type(block))
blocked, blocked_hash, reason = block.values()
- logger.debug(f"blocked='{blocked}',blocked_hash='{blocked_hash}',reason='{reason}':")
+
+ logger.debug("blocked='%s',blocked_hash='%s',reason='%s'", blocked, blocked_hash, reason)
blocked = tidyup.domain(blocked)
reason = tidyup.reason(reason) if reason is not None and reason != "" else None
logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
if blocked == "":
logger.warning("blocked is empty, domain='%s'", domain)
continue
- elif blacklist.is_blacklisted(blocked):
- logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
- continue
elif blocked.count("*") > 0:
# Doing the hash search for instance names as well to tidy up DB
row = instances.deobscure("*", blocked, blocked_hash)
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',blocked_hash='%s' - SKIPPED!", blocked, blocked_hash)
continue
logger.debug("Updating domain: row[0]='%s'", row[0])
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',blocked_hash='{blocked_hash}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',blocked_hash='%s' - SKIPPED!", blocked, blocked_hash)
continue
logger.debug("Updating domain: row[0]='%s'", row[0])
origin = row[1]
nodeinfo_url = row[2]
- logger.debug("Looking up instance by domain:", blocked)
+ logger.debug("Looking up instance by domain: blocked='%s'", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
+ elif blacklist.is_blacklisted(blocked):
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
+ continue
elif not instances.is_registered(blocked):
- logger.debug(f"Domain blocked='{blocked}' wasn't found, adding ..., domain='{domain}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
+ logger.debug(f"Domain blocked='%s' wasn't found, adding ..., domain='%s',origin='%s',nodeinfo_url='%s'", blocked, domain, origin, nodeinfo_url)
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
- logger.debug("Looking up instance by domain:", blocked)
+ logger.debug("Looking up instance by domain: blocked='%s'", blocked)
if not utils.is_domain_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
elif not instances.is_registered(blocked):
- logger.debug("Hash wasn't found, adding:", blocked, domain)
+ logger.debug("Hash wasn't found, adding: blocked='%s',domain='%s'", blocked, domain)
instances.add(blocked, domain, inspect.currentframe().f_code.co_name, nodeinfo_url)
if not blocks.is_instance_blocked(domain, blocked, block_level):
- logger.debug("Blocking:", domain, blocked, block_level)
+ logger.debug("Blocking domain='%s',blocked='%s',block_level='%s' ...", domain, blocked, block_level)
blocks.add_instance(domain, blocked, reason, block_level)
if block_level == "reject":
"reason" : reason
})
else:
- logger.debug(f"Updating block last seen and reason for domain='{domain}',blocked='{blocked}' ...")
+ logger.debug("Updating block last seen and reason for domain='%s',blocked='%s' ...", domain, blocked)
blocks.update_last_seen(domain, blocked, block_level)
blocks.update_reason(reason, domain, blocked, block_level)
logger.debug("Invoking commit() ...")
database.connection.commit()
except network.exceptions as exception:
- logger.warning(f"domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
+ logger.warning("domain='%s',exception[%s]='%s'", domain, type(exception), str(exception))
instances.set_last_error(domain, exception)
logger.debug("EXIT!")
logger.debug("Checking CSRF for domain='%s'", domain)
headers = csrf.determine(domain, dict())
except network.exceptions as exception:
- logger.warning(f"Exception '{type(exception)}' during checking CSRF (fetch_peers,{__name__}) - EXIT!")
+ logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s) - EXIT!", type(exception), __name__)
instances.set_last_error(domain, exception)
return peers
logger.debug(f"Found host='{record[mode2]['host']}', adding ...")
peers.append(record[mode2]["host"])
else:
- logger.warning(f"record from '{domain}' has no '{mode2}' or 'host' record: {record}")
+ logger.warning("Record from domain='%s' has no mode2='%s' or 'host' record[]='%s", domain, mode2, type(record))
if len(rows) < 100:
- logger.debug(f"Reached end of JSON response, domain='{domain}'")
+ logger.debug("Reached end of JSON response, domain='%s'", domain)
break
# Continue with next row
start = start + 100
else:
- logger.warning(f"domain='{domain}' causes error during API query: '{data['error_message']}' - SKIPPED!")
+ logger.warning("domain='%s' causes error during API query: '%s' - SKIPPED!", domain, data['error_message'])
break
- logger.debug(f"Adding '{len(peers)}' for domain='{domain}'")
+ logger.debug("Adding %d peers for domain='%s'", len(peers), domain)
instances.set_total_peers(domain, peers)
- logger.debug(f"Returning peers[]='{type(peers)}'")
+ logger.debug("Returning peers[]='%s' - EXIT!", type(peers))
return peers
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
logger.debug("row[]='%s'", type(row))
if row is None:
- logger.warning(f"Cannot deobsfucate blocked='{blocked}',domain='{domain}',origin='{origin}' - SKIPPED!")
+ logger.warning("Cannot deobsfucate blocked='%s',domain='%s',origin='%s' - SKIPPED!", blocked, domain, origin)
continue
logger.debug("blocked='%s' de-obscured to '%s'", blocked, row[0])
else:
logger.warning("header_text='%s' not found in blocklist()=%d", header_text, len(blocklist))
- logger.debug("Returning blocklist for domain='%s' ...", domain)
+ logger.debug("Returning blocklist for domain='%s' - EXIT!", domain)
return {
"reject" : blocklist["Suspended servers"],
"media_removal" : blocklist["Filtered media"],
else:
response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
- logger.debug(f"response[]='{type(response)}' - EXXIT!")
+ logger.debug(f"response[]='{type(response)}' - EXIT!")
return response
def process_domain(domain: str, blocker: str, command: str) -> bool:
logger.debug("domains()=%d - EXIT!", len(domains))
return domains
-def is_domain_wanted (domain: str) -> bool:
+def is_domain_wanted(domain: str) -> bool:
logger.debug("domain='%s' - CALLED!", domain)
wanted = True