- # No CSRF by default, you don't have to add network.api_headers by yourself here
- headers = tuple()
- data = dict()
-
- try:
- # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'")
- headers = csrf.determine(domain, dict())
- except network.exceptions as exception:
- print(f"WARNING: Exception '{type(exception)}' during checking CSRF (nodeinfo,{__name__}) - EXIT!")
- instances.set_last_error(domain, exception)
- return {
- "status_code" : 500,
- "error_message": f"exception[{type(exception)}]='{str(exception)}'",
- "exception" : exception,
- }
-
- request_paths = [
- "/nodeinfo/2.1.json",
- "/nodeinfo/2.1",
- "/nodeinfo/2.0.json",
- "/nodeinfo/2.0",
- "/nodeinfo/1.0",
- "/api/v1/instance"
- ]
-
- for request in request_paths:
- # DEBUG: print(f"DEBUG: path[{type(path)}]='{path}',request='{request}'")
- if path is None or path == request or path == f"http://{domain}{path}" or path == f"https://{domain}{path}":
- # DEBUG: print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...")
- if path == f"http://{domain}{path}" or path == f"https://{domain}{path}":
- # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' has protocol in path, splitting ...")
- components = urlparse(path)
- path = components.path
-
- data = network.get_json_api(
- domain,
- request,
- headers,
- (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout"))
- )
-
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
- if "error_message" not in data:
- # DEBUG: print("DEBUG: Success:", request)
- instances.set_detection_mode(domain, "STATIC_CHECK")
- instances.set_nodeinfo_url(domain, request)
- break
-
- print(f"WARNING: Failed fetching nodeinfo from domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'")
-
- # DEBUG: print(f"DEBUG: data()={len(data)} - EXIT!")
- return data
-
-def fetch_wellknown_nodeinfo(domain: str) -> dict:
- # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
- if not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
- elif domain == "":
- raise ValueError("Parameter 'domain' is empty")
- elif domain.lower() != domain:
- raise ValueError(f"Parameter domain='{domain}' must be all lower-case")
- elif not validators.domain(domain.split("/")[0]):
- raise ValueError(f"domain='{domain}' is not a valid domain")
- elif domain.endswith(".arpa"):
- raise ValueError(f"domain='{domain}' is a domain for reversed IP addresses, please don't crawl them!")
- elif domain.endswith(".tld"):
- raise ValueError(f"domain='{domain}' is a fake domain, please don't crawl them!")
-
- # No CSRF by default, you don't have to add network.api_headers by yourself here
- headers = tuple()
-
- try:
- # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'")
- headers = csrf.determine(domain, dict())
- except network.exceptions as exception:
- print(f"WARNING: Exception '{type(exception)}' during checking CSRF (fetch_wellknown_nodeinfo,{__name__}) - EXIT!")
- instances.set_last_error(domain, exception)
- return {
- "status_code" : 500,
- "error_message": type(exception),
- "exception" : exception,
- }
-
- # DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain)
- data = network.get_json_api(
- domain,
- "/.well-known/nodeinfo",
- headers,
- (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout"))
- )