c = conn.cursor()
def get_hash(domain: str) -> str:
+ # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
return sha256(domain.encode("utf-8")).hexdigest()
def get_peers(domain: str) -> str:
+ # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
+ peers = None
+
try:
res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
- return res.json()
+ peers = res.json()
except:
print("WARNING: Cannot fetch peers:", domain, res.status_code)
- return None
-def get_type(instdomain: str) -> str:
+ # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
+ return peers
+
+def post_json_api(domain: str, path: str, data: str) -> str:
+ # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data():", domain, path, len(data))
+ doc = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=5).json()
+
+ if doc == []:
+ print("WARNING: Cannot query JSON API:", domain, path)
+ raise
+
+ # NOISY-DEBUG: print("DEBUG: Returning doc():", len(doc))
+ return doc
+
+def determine_software(domain: str) -> str:
+ # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
+ software = None
try:
- res = reqto.get(f"https://{instdomain}/nodeinfo/2.1.json", headers=headers, timeout=5)
+ res = reqto.get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
if res.status_code == 404:
- res = reqto.get(f"https://{instdomain}/nodeinfo/2.0", headers=headers, timeout=5)
+ res = reqto.get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
if res.status_code == 404:
- res = reqto.get(f"https://{instdomain}/nodeinfo/2.0.json", headers=headers, timeout=5)
+ res = reqto.get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
if res.ok and "text/html" in res.headers["content-type"]:
- res = reqto.get(f"https://{instdomain}/nodeinfo/2.1", headers=headers, timeout=5)
+ res = reqto.get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
if res.ok:
if res.json()["software"]["name"] in ["akkoma", "rebased"]:
- return "pleroma"
+ software = "pleroma"
elif res.json()["software"]["name"] in ["hometown", "ecko"]:
- return "mastodon"
+ software = "mastodon"
elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
- return "misskey"
+ software = "misskey"
else:
- return res.json()["software"]["name"]
+ software = res.json()["software"]["name"]
elif res.status_code == 404:
- res = reqto.get(f"https://{instdomain}/api/v1/instance", headers=headers, timeout=5)
+ res = reqto.get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
if res.ok:
- return "mastodon"
+ software = "mastodon"
except:
- return None
+ print("WARNING: Failed fetching instance meta data")
+
+ # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
+ return software
def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
# NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
block_level
),
)
+
except:
print("ERROR: failed SQL query")
sys.exit(255)
block_level
)
)
+
except:
print("ERROR: failed SQL query")
sys.exit(255)
last_seen
),
)
+
except:
print("ERROR: failed SQL query")
sys.exit(255)
(
domain,
get_hash(domain),
- get_type(domain)
+ determine_software(domain)
),
)
+
except:
print("ERROR: failed SQL query")
sys.exit(255)
else:
if len(block["reason"]) > 420:
block["reason"] = block["reason"][0:419] + "[…]"
+
message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
+
if truncated:
message = message + "(the list has been truncated to the first 20 entries)"
botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
+
req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
headers=botheaders, timeout=10).json()
+
return True
def get_mastodon_blocks(domain: str) -> dict:
+ # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
blocks = {
"Suspended servers": [],
"Filtered media": [],
for header in doc.find_all("h3"):
header_text = header.text
+
if header_text in translations:
header_text = translations[header_text]
+
if header_text in blocks:
# replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
for line in header.find_all_next("table")[0].find_all("tr")[1:]:
"reason": line.find_all("td")[1].text.strip(),
}
)
+
+ # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
return {
"reject": blocks["Suspended servers"],
"media_removal": blocks["Filtered media"],
- "followers_only": blocks["Limited servers"]
- + blocks["Silenced servers"],
+ "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
}
def get_friendica_blocks(domain: str) -> dict:
+ # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
blocks = []
try:
"html.parser",
)
except:
+ print("WARNING: Failed to fetch /friendica from domain:", domain)
return {}
blocklist = doc.find(id="about_blocklist")
return {}
for line in blocklist.find("table").find_all("tr")[1:]:
- blocks.append(
- {
- "domain": line.find_all("td")[0].text.strip(),
- "reason": line.find_all("td")[1].text.strip()
- }
- )
+ blocks.append({
+ "domain": line.find_all("td")[0].text.strip(),
+ "reason": line.find_all("td")[1].text.strip()
+ })
+ # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
return {
"reject": blocks
}
def get_misskey_blocks(domain: str) -> dict:
+ # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
blocks = {
"suspended": [],
"blocked": []
counter = 0
step = 99
while True:
- # iterating through all "suspended" (follow-only in its terminology) instances page-by-page, since that troonware doesn't support sending them all at once
+ # iterating through all "suspended" (follow-only in its terminology)
+ # instances page-by-page, since that troonware doesn't support
+ # sending them all at once
try:
if counter == 0:
- doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step}), headers=headers, timeout=5).json()
- if doc == []: raise
+ doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
+ "sort": "+caughtAt",
+ "host": None,
+ "suspended": True,
+ "limit": step
+ }))
else:
- doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
- if doc == []: raise
+ doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
+ "sort": "+caughtAt",
+ "host": None,
+ "suspended": True,
+ "limit": step,
+ "offset": counter-1
+ }))
+
for instance in doc:
# just in case
if instance["isSuspended"]:
# same shit, different asshole ("blocked" aka full suspend)
try:
if counter == 0:
- doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step}), headers=headers, timeout=5).json()
- if doc == []: raise
+ doc = post_json_api(domain,"/api/federation/instances", json.dumps({
+ "sort": "+caughtAt",
+ "host": None,
+ "blocked": True,
+ "limit": step
+ }))
else:
- doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
- if doc == []: raise
+ doc = post_json_api(domain,"/api/federation/instances", json.dumps({
+ "sort": "+caughtAt",
+ "host": None,
+ "blocked": True,
+ "limit": step,
+ "offset": counter-1
+ }))
+
for instance in doc:
if instance["isBlocked"]:
- blocks["blocked"].append(
- {
+ blocks["blocked"].append({
"domain": instance["host"],
"reason": ""
- }
- )
+ })
counter = counter + step
+
except:
counter = 0
break
+ # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
return {
"reject": blocks["blocked"],
"followers_only": blocks["suspended"]
}
except:
+ print("WARNING: API request failed for domain:", domain)
return {}
def tidyup(domain: str) -> str: