from fba import blacklist
from fba import config
+from fba import csrf
from fba import fba
from fba import instances
from fba import network
# DEBUG: print(f"DEBUG: Invoking peertube.fetch_peers({domain}) ...")
return peertube.fetch_peers(domain)
+ # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'")
+ headers = csrf.determine(domain, dict())
+
# DEBUG: print(f"DEBUG: Fetching peers from '{domain}',software='{software}' ...")
data = network.get_json_api(
domain,
"/api/v1/instance/peers",
+ headers,
(config.get("connection_timeout"), config.get("read_timeout"))
)
# DEBUG: print(f"DEBUG: data[]='{type(data)}'")
# DEBUG: print("DEBUG: nodeinfo()={len(nodeinfo))} - EXIT!")
return nodeinfo
+ # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'")
+ headers = csrf.determine(domain, dict())
+
request_paths = [
"/nodeinfo/2.1.json",
"/nodeinfo/2.1",
data = network.get_json_api(
domain,
request,
+ headers,
(config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout"))
)
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
+ # DEBUG: print(f"DEBUG: Checking CSRF for domain='{domain}'")
+ headers = csrf.determine(domain, dict())
+
# DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain)
data = network.get_json_api(
domain,
"/.well-known/nodeinfo",
+ headers,
(config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout"))
)
"last_status_code" : {},
# Last error details
"last_error_details" : {},
- # Whether CSRF tokens are present
- "has_csrf" : {},
}
def set_data(key: str, domain: str, value: any):
# DEBUG: print("DEBUG: EXIT!")
def update_last_error(domain: str, error: dict):
- print("DEBUG: domain,error[]:", domain, type(error))
+ # DEBUG: print("DEBUG: domain,error[]:", domain, type(error))
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
- print("DEBUG: BEFORE error[]:", type(error))
+ # DEBUG: print("DEBUG: BEFORE error[]:", type(error))
if isinstance(error, BaseException) or isinstance(error, json.decoder.JSONDecodeError):
error = f"error[{type(error)}]='{str(error)}'"
- print("DEBUG: AFTER error[]:", type(error))
+ # DEBUG: print("DEBUG: AFTER error[]:", type(error))
if isinstance(error, str):
- print(f"DEBUG: Setting last_error_details='{error}'")
+ # DEBUG: print(f"DEBUG: Setting last_error_details='{error}'")
set_data("last_status_code" , domain, 999)
set_data("last_error_details", domain, error)
elif isinstance(error, requests.models.Response):
- print(f"DEBUG: Setting last_error_details='{error.reason}'")
+ # DEBUG: print(f"DEBUG: Setting last_error_details='{error.reason}'")
set_data("last_status_code" , domain, error.status_code)
set_data("last_error_details", domain, error.reason)
else:
- print(f"DEBUG: Setting last_error_details='{error['error_message']}'")
+ # DEBUG: print(f"DEBUG: Setting last_error_details='{error['error_message']}'")
set_data("last_status_code" , domain, error["status_code"])
set_data("last_error_details", domain, error["error_message"])
# Running pending updated
- print(f"DEBUG: Invoking update_data({domain}) ...")
+ # DEBUG: print(f"DEBUG: Invoking update_data({domain}) ...")
update_data(domain)
fba.log_error(domain, error)
- print("DEBUG: EXIT!")
+ # DEBUG: print("DEBUG: EXIT!")
def is_registered(domain: str) -> bool:
# DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
def acquire():
global LOCK
- print("DEBUG: CALLED!")
+ # DEBUG: print("DEBUG: CALLED!")
try:
- print(f"DEBUG: Acquiring lock: '{lockfile}'")
+ # DEBUG: print(f"DEBUG: Acquiring lock: '{lockfile}'")
LOCK = zc.lockfile.LockFile(lockfile)
- print("DEBUG: Lock obtained.")
+ # DEBUG: print("DEBUG: Lock obtained.")
except zc.lockfile.LockError:
print(f"ERROR: Cannot aquire lock: '{lockfile}'")
sys.exit(100)
- print("DEBUG: EXIT!")
+ # DEBUG: print("DEBUG: EXIT!")
def release():
- print("DEBUG: CALLED!")
+ # DEBUG: print("DEBUG: CALLED!")
if LOCK is not None:
- print("DEBUG: Releasing lock ...")
+ # DEBUG: print("DEBUG: Releasing lock ...")
LOCK.close()
- print(f"DEBUG: Deleting lockfile='{lockfile}' ...")
+ # DEBUG: print(f"DEBUG: Deleting lockfile='{lockfile}' ...")
os.remove(lockfile)
- print("DEBUG: EXIT!")
+ # DEBUG: print("DEBUG: EXIT!")
import requests
from fba import config
-from fba import csrf
from fba import fba
from fba import instances
elif not isinstance(headers, dict):
raise ValueError(f"headers[]={type(headers)} is not 'list'")
- # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}' ...")
- headers = csrf.determine(domain, {**api_headers, **headers})
-
json_reply = {
"status_code": 200,
}
response = reqto.post(
f"https://{domain}{path}",
data=data,
- headers=headers,
+ headers={**api_headers, **headers},
timeout=(config.get("connection_timeout"), config.get("read_timeout"))
)
# DEBUG: print(f"DEBUG: Returning json_reply({len(json_reply)})=[]:{type(json_reply)}")
return json_reply
-def get_json_api(domain: str, path: str, timeout: tuple) -> dict:
- # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',data='{data}',timeout()={len(timeout)} - CALLED!")
+def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict:
+ # DEBUG: print(f"DEBUG: domain='{domain}',path='{path}',timeout()={len(timeout)} - CALLED!")
if not isinstance(domain, str):
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError(f"path[]={type(path)} is not 'str'")
elif path == "":
raise ValueError("Parameter 'path' cannot be empty")
+ elif not isinstance(headers, dict):
+ raise ValueError(f"headers[]={type(headers)} is not 'list'")
elif not isinstance(timeout, tuple):
raise ValueError(f"timeout[]={type(timeout)} is not 'tuple'")
- # DEBUG: print(f"DEBUG: Determining if CSRF header needs to be sent for domain='{domain}' ...")
- headers = csrf.determine(domain, api_headers)
-
json_reply = {
"status_code": 200,
}
# DEBUG: print(f"DEBUG: Sending GET to domain='{domain}',path='{path}',timeout({len(timeout)})={timeout}")
response = reqto.get(
f"https://{domain}{path}",
- headers=headers,
+ headers={**api_headers, **headers},
timeout=timeout
)
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from fba import config
+from fba import csrf
from fba import federation
from fba import instances
from fba import network
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
+ print(f"DEBUG: Checking CSRF for domain='{domain}'")
+ headers = csrf.determine(domain, dict())
+
peers = list()
try:
# DEBUG: print(f"DEBUG: domain='{domain}' is Lemmy, fetching JSON ...")
data = network.get_json_api(
domain,
"/api/v3/site",
+ headers,
(config.get("connection_timeout"), config.get("read_timeout"))
)
from fba import blacklist
from fba import blocks
from fba import config
+from fba import csrf
from fba import fba
from fba import instances
from fba import network
elif nodeinfo_url == "":
raise ValueError("Parameter 'nodeinfo_url' is empty")
+ print(f"DEBUG: Checking CSRF for domain='{domain}'")
+ headers = csrf.determine(domain, dict())
+
try:
# json endpoint for newer mastodongs
blockdict = list()
data = network.get_json_api(
domain,
"/api/v1/instance/domain_blocks",
+ headers,
(config.get("connection_timeout"), config.get("read_timeout"))
)
from fba import blacklist
from fba import config
+from fba import csrf
from fba import instances
from fba import network
raise ValueError("Parameter 'domain' is empty")
# DEBUG: print(f"DEBUG: domain='{domain}' is misskey, sending API POST request ...")
- peers = list()
- offset = 0
- step = config.get("misskey_limit")
+ peers = list()
+ offset = 0
+ step = config.get("misskey_limit")
+ headers = csrf.determine(domain, {"Origin": domain})
# iterating through all "suspended" (follow-only in its terminology)
# instances page-by-page, since that troonware doesn't support
"sort" : "+pubAt",
"host" : None,
"limit": step
- }), {
- "Origin": domain
- })
+ }), headers)
else:
fetched = network.post_json_api(domain, "/api/federation/instances", json.dumps({
"sort" : "+pubAt",
"host" : None,
"limit" : step,
"offset": offset - 1
- }), {
- "Origin": domain
- })
+ }), headers)
# DEBUG: print(f"DEBUG: fetched()={len(fetched)}")
if len(fetched) == 0:
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
- # DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
+ # DEBUG: print(f"DEBUG: Fetching misskey blocks from domain={domain}")
blocklist = {
"suspended": [],
"blocked" : []
}
- offset = 0
- step = config.get("misskey_limit")
+ offset = 0
+ step = config.get("misskey_limit")
+ headers = csrf.determine(domain, {"Origin": domain})
+
while True:
# iterating through all "suspended" (follow-only in its terminology)
# instances page-by-page, since that troonware doesn't support
"host" : None,
"suspended": True,
"limit" : step
- }), {
- "Origin": domain
- })
+ }), headers)
else:
# DEBUG: print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
fetched = network.post_json_api(domain, "/api/federation/instances", json.dumps({
"suspended": True,
"limit" : step,
"offset" : offset - 1
- }), {
- "Origin": domain
- })
+ }), headers)
# DEBUG: print("DEBUG: fetched():", len(fetched))
if len(fetched) == 0:
try:
if offset == 0:
# DEBUG: print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
- fetched = network.post_json_api(domain, "/api/federation/instances", json.dumps({
+ fetched = network.post_json_api(domain, "/api/federation/instances", headers, json.dumps({
"sort" : "+pubAt",
"host" : None,
"blocked": True,
"limit" : step
- }), {
- "Origin": domain
- })
+ }))
else:
# DEBUG: print("DEBUG: Sending JSON API request to domain,step,offset:", domain, step, offset)
- fetched = network.post_json_api(domain, "/api/federation/instances", json.dumps({
+ fetched = network.post_json_api(domain, "/api/federation/instances", headers, json.dumps({
"sort" : "+pubAt",
"host" : None,
"blocked": True,
"limit" : step,
"offset" : offset - 1
- }), {
- "Origin": domain
- })
+ }))
# DEBUG: print("DEBUG: fetched():", len(fetched))
if len(fetched) == 0:
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from fba import config
+from fba import csrf
from fba import instances
from fba import network
print(f"DEBUG: domain='{domain}' is a PeerTube, fetching JSON ...")
peers = list()
start = 0
+
+ print(f"DEBUG: Checking CSRF for domain='{domain}'")
+ headers = csrf.determine(domain, dict())
+
for mode in ["followers", "following"]:
print(f"DEBUG: domain='{domain}',mode='{mode}'")
while True:
data = network.get_json_api(
domain,
"/api/v1/server/{mode}?start={start}&count=100",
+ headers,
(config.get("connection_timeout"), config.get("read_timeout"))
)