import reqto
from fba import config
-#from fba import instances
from fba import network
def determine(domain: str, headers: dict) -> dict:
- # DEBUG: print(f"DEBUG: domain='{domain}',headers()={len(headers)} - CALLED!")
+ print(f"DEBUG: domain='{domain}',headers()={len(headers)} - CALLED!")
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
# Default headers with no CSRF
reqheaders = headers
- try:
- # Fetch / to check for meta tag indicating csrf
- # DEBUG: print(f"DEBUG: Fetching / from domain='{domain}' for CSRF check ...")
- response = reqto.get(
- f"https://{domain}/",
- headers=network.web_headers,
- timeout=(config.get("connection_timeout"), config.get("read_timeout"))
+ # Fetch / to check for meta tag indicating csrf
+ print(f"DEBUG: Fetching / from domain='{domain}' for CSRF check ...")
+ response = reqto.get(
+ f"https://{domain}/",
+ headers=network.web_headers,
+ timeout=(config.get("connection_timeout"), config.get("read_timeout"))
+ )
+
+ print(f"DEBUG: response.ok='{response.ok}',response.status_code={response.status_code},response.text()={len(response.text)}")
+ if response.ok and len(response.text) > 0:
+ meta = bs4.BeautifulSoup(
+ response.text,
+ "html.parser"
)
+ print(f"DEBUG: meta[]='{type(meta)}'")
+ tag = meta.find("meta", attrs={"name": "csrf-token"})
- # DEBUG: print(f"DEBUG: response.ok='{response.ok}',response.status_code={response.status_code},response.text()={len(response.text)}")
- if response.ok and len(response.text) > 0:
- meta = bs4.BeautifulSoup(
- response.text,
- "html.parser"
- )
- # DEBUG: print(f"DEBUG: meta[]='{type(meta)}'")
-
- tag = meta.find("meta", attrs={"name": "csrf-token"})
- # DEBUG: print(f"DEBUG: tag={tag}")
-
- # DEBUG: print(f"DEBUG: Adding CSRF token='{tag['content']}' for domain='{domain}'")
+ print(f"DEBUG: tag={tag}")
+ if tag is not None:
+ print(f"DEBUG: Adding CSRF token='{tag['content']}' for domain='{domain}'")
reqheaders["X-CSRF-Token"] = tag["content"]
- except BaseException as exception:
- # DEBUG: print(f"DEBUG: No CSRF token found, using normal headers: domain='{domain}',exception[{type(exception)}]={exception}")
- pass
-
- # DEBUG: print(f"DEBUG: reqheaders()={len(reqheaders)} - EXIT!")
+ print(f"DEBUG: reqheaders()={len(reqheaders)} - EXIT!")
return reqheaders
return peertube.fetch_peers(domain)
# DEBUG: print(f"DEBUG: Fetching peers from '{domain}',software='{software}' ...")
- peers = list()
- response = network.fetch_response(domain, "/api/v1/instance/peers", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
- # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
-
- data = network.json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
-
- if not response.ok or response.status_code >= 400:
+ data = list()
+ response = network.get_json_api(
+ domain,
+ "/api/v1/instance/peers",
+ (config.get("connection_timeout"), config.get("read_timeout"))
+ )
+ # DEBUG: print(f"DEBUG: data[]='{type(data)}'")
+
+ if "error_message" in data:
# DEBUG: print("DEBUG: Was not able to fetch peers, trying alternative ...")
- response = network.fetch_response(domain, "/api/v3/site", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ data = network.get_json_api(
+ domain,
+ "/api/v3/site",
+ (config.get("connection_timeout"), config.get("read_timeout"))
+ )
- data = network.json_from_response(response)
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
- if not response.ok or response.status_code >= 400:
- print("WARNING: Could not reach any JSON API:", domain)
- instances.update_last_error(domain, response)
- elif response.ok and isinstance(data, list):
- # DEBUG: print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
- sys.exit(255)
+ if "error_message" in data:
+ print(f"WARNING: Could not reach any JSON API at domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'")
elif "federated_instances" in data:
# DEBUG: print(f"DEBUG: Found federated_instances for domain='{domain}'")
peers = peers + add_peers(data["federated_instances"])
continue
# DEBUG: print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...")
- response = network.fetch_response(domain, request, network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
+ data = network.get_json_api(
+ domain,
+ request,
+ (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout"))
+ )
- data = network.json_from_response(response)
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
- if response.ok and isinstance(data, dict):
+ if "error_message" not in data:
# DEBUG: print("DEBUG: Success:", request)
instances.set_data("detection_mode", domain, "STATIC_CHECK")
instances.set_data("nodeinfo_url" , domain, request)
break
- elif response.ok and isinstance(data, list):
- print(f"UNSUPPORTED: domain='{domain}' returned a list: '{data}'")
- sys.exit(255)
- elif not response.ok or response.status_code >= 400:
- print("WARNING: Failed fetching nodeinfo from domain:", domain)
- instances.update_last_error(domain, response)
- continue
+
+ print(f"WARNING: Failed fetching nodeinfo from domain='{domain}',status_code='{data['status_code']}',error_message='{data['error_message']}'")
# DEBUG: print(f"DEBUG: data()={len(data)} - EXIT!")
return data
raise ValueError("Parameter 'domain' is empty")
# DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain)
- response = network.fetch_response(domain, "/.well-known/nodeinfo", network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
+ data = network.get_json_api(
+ domain,
+ "/.well-known/nodeinfo",
+ (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout"))
+ )
- data = network.json_from_response(response)
- # DEBUG: print("DEBUG: domain,response.ok,data[]:", domain, response.ok, type(data))
- if response.ok and isinstance(data, dict):
+ if "error_message" not in data:
nodeinfo = data
# DEBUG: print("DEBUG: Found entries:", len(nodeinfo), domain)
if "links" in nodeinfo:
# DEBUG: print("DEBUG: rel,href:", link["rel"], link["href"])
if link["rel"] in nodeinfo_identifier:
# DEBUG: print("DEBUG: Fetching nodeinfo from:", link["href"])
- response = fba.fetch_url(link["href"], network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = fba.fetch_url(
+ link["href"],
+ network.api_headers,
+ (config.get("connection_timeout"), config.get("read_timeout"))
+ )
data = network.json_from_response(response)
# DEBUG: print("DEBUG: href,response.ok,response.status_code:", link["href"], response.ok, response.status_code)
instances.set_data("detection_mode", domain, "AUTO_DISCOVERY")
instances.set_data("nodeinfo_url" , domain, link["href"])
break
+ else:
+ instances.update_last_error(domain, data)
else:
print("WARNING: Unknown 'rel' value:", domain, link["rel"])
else:
software = tidyup.domain(generator.get("content"))
print(f"INFO: domain='{domain}' is generated by '{software}'")
instances.set_data("detection_mode", domain, "GENERATOR")
- fba.remove_pending_error(domain)
elif isinstance(site_name, bs4.element.Tag):
# DEBUG: print("DEBUG: Found property=og:site_name:", domain)
sofware = tidyup.domain(site_name.get("content"))
print(f"INFO: domain='{domain}' has og:site_name='{software}'")
instances.set_data("detection_mode", domain, "SITE_NAME")
- fba.remove_pending_error(domain)
# DEBUG: print(f"DEBUG: software[]={type(software)}")
if isinstance(software, str) and software == "":
"last_status_code" : {},
# Last error details
"last_error_details" : {},
+ # Whether CSRF tokens are present
+ "has_csrf" : {},
}
def set_data(key: str, domain: str, value: any):
"Content-Type": "application/json",
}
-def post_json_api(domain: str, path: str, parameter: str, extra_headers: dict = {}) -> dict:
- # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',parameter='{parameter}',extra_headers()={len(extra_headers)} - CALLED!")
+def post_json_api(domain: str, path: str, data: str, headers: dict = {}) -> dict:
+ # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',data='{data}',headers()={len(headers)} - CALLED!")
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"path[]={type(path)} is not 'str'")
elif path == "":
raise ValueError("Parameter 'path' cannot be empty")
- elif not isinstance(parameter, str):
- raise ValueError(f"parameter[]={type(parameter)} is not 'str'")
+ elif not isinstance(data, str):
+ raise ValueError(f"data[]={type(data)} is not 'str'")
+ elif not isinstance(headers, dict):
+ raise ValueError(f"headers[]={type(headers)} is not 'list'")
# DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}' ...")
- headers = csrf.determine(domain, {**api_headers, **extra_headers})
+ headers = csrf.determine(domain, {**api_headers, **headers})
- data = {}
+ json_reply = {}
try:
- # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',parameter='{parameter}',extra_headers({len(extra_headers)})={extra_headers}")
+ # DEBUG: print(f"DEBUG: Sending POST to domain='{domain}',path='{path}',data='{data}',headers({len(headers)})={headers}")
response = reqto.post(
f"https://{domain}{path}",
- data=parameter,
+ data=data,
headers=headers,
timeout=(config.get("connection_timeout"), config.get("read_timeout"))
)
except requests.exceptions.ConnectionError as exception:
# DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
+ json_reply["status_code"] = 999
+ json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
instances.update_last_error(domain, exception)
raise exception
- data = json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+ json_reply = json_from_response(response)
+
+ # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
if not response.ok or response.status_code >= 400:
- print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',parameter()={len(parameter)},response.status_code='{response.status_code}',data[]='{type(data)}'")
+ print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',data()={len(data)},response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
+ json_reply["status_code"] = response.status_code
+ json_reply["error_message"] = response.text
instances.update_last_error(domain, response)
- # DEBUG: print(f"DEBUG: Returning data({len(data)})=[]:{type(data)}")
- return data
+ # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
+ return json_reply
+
+def get_json_api(domain: str, path: str, timeout: tuple) -> dict:
+ # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',data='{data}',timeout()={len(timeout)} - CALLED!")
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif not isinstance(path, str):
+ raise ValueError(f"path[]={type(path)} is not 'str'")
+ elif path == "":
+ raise ValueError("Parameter 'path' cannot be empty")
+ elif not isinstance(timeout, tuple):
+ raise ValueError(f"timeout[]={type(timeout)} is not 'tuple'")
+
+ # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}' ...")
+ headers = csrf.determine(domain, api_headers)
+
+ json_reply = {
+ "status_code": 200,
+ }
+
+ try:
+ # DEBUG: print(f"DEBUG: Sending GET to domain='{domain}',path='{path}',timeout({len(timeout)})={timeout}")
+ response = reqto.get(
+ f"https://{domain}{path}",
+ headers=headers,
+ timeout=timeout
+ )
+
+ except requests.exceptions.ConnectionError as exception:
+ # DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' failed. exception[{type(exception)}]='{str(exception)}'")
+ json_reply["status_code"] = 999
+ json_reply["error_message"] = f"exception['{type(exception)}']='{str(exception)}'"
+ instances.update_last_error(domain, exception)
+ raise exception
+
+ json_reply = json_from_response(response)
+
+ # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},json_reply[]='{type(json_reply)}'")
+ if not response.ok or response.status_code >= 400:
+ print(f"WARNING: Cannot query JSON API: domain='{domain}',path='{path}',response.status_code='{response.status_code}',json_reply[]='{type(json_reply)}'")
+ json_reply["status_code"] = response.status_code
+ json_reply["error_message"] = response.text
+ instances.update_last_error(domain, response)
+
+ # DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
+ return json_reply
def send_bot_post(domain: str, blocklist: dict):
# DEBUG: print(f"DEBUG: domain={domain},blocklist()={len(blocklist)} - CALLED!")
return True
-def fetch_response(domain: str, path: str, headers: dict, timeout: list) -> requests.models.Response:
+def fetch_response(domain: str, path: str, headers: dict, timeout: tuple) -> requests.models.Response:
# DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',headers()={len(headers)},timeout={timeout} - CALLED!")
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
raise ValueError(f"Parameter path[]='{type(path)}' is not 'str'")
elif path == "":
raise ValueError("Parameter 'path' is empty")
-
- # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}',headers()='{len(headers)}' ...")
- headers = csrf.determine(domain, headers)
+ elif not isinstance(headers, dict):
+ raise ValueError(f"headers[]={type(headers)} is not 'dict'")
+ elif not isinstance(timeout, tuple):
+ raise ValueError(f"timeout[]={type(timeout)} is not 'tuple'")
try:
# DEBUG: print(f"DEBUG: Sending GET request to '{domain}{path}' ...")
peers = list()
try:
# DEBUG: print(f"DEBUG: domain='{domain}' is Lemmy, fetching JSON ...")
- response = network.fetch_response(
+ data = network.get_json_api(
domain,
"/api/v3/site",
- network.api_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
)
- data = network.json_from_response(response)
-
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',data[]='{type(data)}'")
- if not response.ok or response.status_code >= 400:
+ # DEBUG: print(f"DEBUG: data['{type(data)}']='{data}'")
+ if "error_message" in data:
print("WARNING: Could not reach any JSON API:", domain)
instances.update_last_error(domain, response)
- elif response.ok and isinstance(data, list):
- print(f"UNSUPPORTED: domain='{domain}' returned a list: '{data}'")
elif "federated_instances" in data:
# DEBUG: print(f"DEBUG: Found federated_instances for domain='{domain}'")
peers = peers + federation.add_peers(data["federated_instances"])
# DEBUG: print("DEBUG: Added instance(s) to peers")
else:
print("WARNING: JSON response does not contain 'federated_instances':", domain)
- instances.update_last_error(domain, response)
+ instances.update_last_error(domain, data)
except BaseException as exception:
print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
}
# DEBUG: print("DEBUG: Querying API domain_blocks:", domain)
- response = network.fetch_response(
+ blocklist = network.get_json_api(
domain,
"/api/v1/instance/domain_blocks",
- network.api_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
)
- # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
- blocklist = network.json_from_response(response)
+ if "error_message" in blocklist:
+ print(f"WARNING: Was not able to fetch domain_blocks from domain='{domain}': status_code='{data['status_code']}',error_message='{data['error_message']}'")
+ instances.update_last_error(domain, blocklist)
print(f"INFO: Checking {len(blocklist)} entries from domain='{domain}',software='mastodon' ...")
for block in blocklist:
for mode in ["followers", "following"]:
print(f"DEBUG: domain='{domain}',mode='{mode}'")
while True:
- try:
- response = network.fetch_response(
- domain,
- "/api/v1/server/{mode}?start={start}&count=100",
- network.api_headers,
- (config.get("connection_timeout"), config.get("read_timeout"))
- )
+ data = network.get_json_api(
+ domain,
+ "/api/v1/server/{mode}?start={start}&count=100",
+ (config.get("connection_timeout"), config.get("read_timeout"))
+ )
- data = network.json_from_response(response)
- print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',data[]='{type(data)}'")
- if response.ok and isinstance(data, dict):
- print("DEBUG: Success, data:", len(data))
- if "data" in data:
- print(f"DEBUG: Found {len(data['data'])} record(s).")
- for record in data["data"]:
- print(f"DEBUG: record()={len(record)}")
- if mode in record and "host" in record[mode]:
- print(f"DEBUG: Found host={record[mode]['host']}, adding ...")
- peers.append(record[mode]["host"])
- else:
- print(f"WARNING: record from '{domain}' has no '{mode}' or 'host' record: {record}")
+ print(f"DEBUG: data['{type(data)}']='{data}'")
+ if "error_message" not in data:
+ print("DEBUG: Success, data:", len(data))
+ if "data" in data:
+ print(f"DEBUG: Found {len(data['data'])} record(s).")
+ for record in data["data"]:
+ print(f"DEBUG: record()={len(record)}")
+ if mode in record and "host" in record[mode]:
+ print(f"DEBUG: Found host={record[mode]['host']}, adding ...")
+ peers.append(record[mode]["host"])
+ else:
+ print(f"WARNING: record from '{domain}' has no '{mode}' or 'host' record: {record}")
- if len(data["data"]) < 100:
- print("DEBUG: Reached end of JSON response:", domain)
- break
+ if len(data["data"]) < 100:
+ print("DEBUG: Reached end of JSON response:", domain)
+ break
- # Continue with next row
- start = start + 100
-
- except BaseException as exception:
- print(f"WARNING: Exception during fetching JSON: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
+ # Continue with next row
+ start = start + 100
print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
instances.set_data("total_peers", domain, len(peers))