@router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
def info():
fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
- known, indexed, blocks, errorous = fba.cursor.fetchone()
+ known, indexed, blocklist, errorous = fba.cursor.fetchone()
return {
"known_instances" : known,
"indexed_instances" : indexed,
- "blocks_recorded" : blocks,
+ "blocks_recorded" : blocklist,
"errorous_instances": errorous,
"slogan" : config.get("slogan")
}
else:
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
- blocks = fba.cursor.fetchall()
+ blocklist = fba.cursor.fetchall()
result = {}
- for blocker, blocked, block_level, reason, first_seen, last_seen in blocks:
+ for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
entry = {
"blocker" : blocker,
"blocked" : blocked,
raise HTTPException(status_code=response.status_code, detail=response.text)
info = response.json()
- blocks = None
+ response = None
if domain != None:
if not validators.domain(domain):
raise HTTPException(status_code=500, detail="Invalid domain")
- blocks = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
elif reason != None:
- blocks = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
elif reverse != None:
if not validators.domain(reverse):
raise HTTPException(status_code=500, detail="Invalid domain")
- blocks = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
- if blocks != None:
- if not blocks.ok:
- raise HTTPException(status_code=blocks.status_code, detail=blocks.text)
- blocks = blocks.json()
- for block_level in blocks:
- for block in blocks[block_level]:
+ if response != None:
+ if not response.ok:
+ raise HTTPException(status_code=response.status_code, detail=response.text)
+ blocklist = response.json()
+ for block_level in blocklist:
+ for block in blocklist[block_level]:
block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
return templates.TemplateResponse("index.html", {
"request": request,
"domain" : domain,
- "blocks" : blocks,
+ "blocks" : blocklist,
"reason" : reason,
"reverse": reverse,
"info" : info
result = fba.cursor.fetchall()
- blocks = []
+ blocklist = []
for blocker, blocked, block_level, reason, first_seen, last_seen in result:
first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
if reason == None or reason == '':
else:
reason = "Provided reason: '" + reason + "'"
- blocks.append({
+ blocklist.append({
"blocker" : blocker,
"blocked" : blocked,
"block_level": block_level,
"timestamp": utils.format_datetime(datetime.now()),
"domain" : domain,
"hostname" : config.get("hostname"),
- "blocks" : blocks
+ "blocks" : blocklist
}, headers={
"Content-Type": "routerlication/rss+xml"
})
json = fba.fetch_misskey_blocks(blocker)
print(f"INFO: Checking {len(json.items())} entries from blocker='{blocker}',software='{software}' ...")
- for block_level, blocks in json.items():
- # DEBUG: print("DEBUG: blocker,block_level,blocks():", blocker, block_level, len(blocks))
+ for block_level, blocklist in json.items():
+ # DEBUG: print("DEBUG: blocker,block_level,blocklist():", blocker, block_level, len(blocklist))
block_level = fba.tidyup_domain(block_level)
# DEBUG: print("DEBUG: AFTER-block_level:", block_level)
if block_level == "":
print("WARNING: block_level is empty, blocker:", blocker)
continue
- # DEBUG: print(f"DEBUG: Checking {len(blocks)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
- for block in blocks:
+ # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
+ for block in blocklist:
blocked, reason = block.values()
# DEBUG: print("DEBUG: BEFORE blocked:", blocked)
blocked = fba.tidyup_domain(blocked)
})
else:
# DEBUG: print(f"DEBUG: Updating block last seen and reason for blocker='{blocker}',blocked='{blocked}' ...")
- fba.update_last_seen(blocker, blocked, block_level)
+ blocks.update_last_seen(blocker, blocked, block_level)
blocks.update_reason(reason, blocker, blocked, block_level)
# DEBUG: print("DEBUG: Committing changes ...")
})
else:
# DEBUG: print(f"DEBUG: Updating block last seen for blocker='{blocker}',blocked='{blocked}' ...")
- fba.update_last_seen(blocker, blocked, "reject")
+ blocks.update_last_seen(blocker, blocked, "reject")
if "public_comment" in peer:
# DEBUG: print("DEBUG: Updating block reason:", blocker, blocked, peer["public_comment"])
# DEBUG: print("DEBUG: EXIT!")
-def send_bot_post(instance: str, blocks: dict):
- # DEBUG: print(f"DEBUG: instance={instance},blocks()={len(blocks)} - CALLED!")
+def send_bot_post(instance: str, blocklist: dict):
+ # DEBUG: print(f"DEBUG: instance={instance},blocklist()={len(blocklist)} - CALLED!")
if type(domain) != str:
raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
- elif type(blocks) != dict:
- raise ValueError(f"Parameter blocks[]='{type(blocks)}' is not 'dict'")
+ elif type(blocklist) != dict:
+ raise ValueError(f"Parameter blocklist[]='{type(blocklist)}' is not 'dict'")
message = instance + " has blocked the following instances:\n\n"
truncated = False
- if len(blocks) > 20:
+ if len(blocklist) > 20:
truncated = True
- blocks = blocks[0 : 19]
+ blocklist = blocklist[0 : 19]
- # DEBUG: print(f"DEBUG: blocks()={len(blocks)}")
- for block in blocks:
+ # DEBUG: print(f"DEBUG: blocklist()={len(blocklist)}")
+ for block in blocklist:
# DEBUG: print(f"DEBUG: block['{type(block)}']={block}")
if block["reason"] == None or block["reason"] == '':
message = message + block["blocked"] + " with unspecified reason\n"
raise ValueError(f"Parameter 'domain' is empty")
# DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
- blocks = []
+ blocklist = []
try:
doc = bs4.BeautifulSoup(
for line in blocklist.find("table").find_all("tr")[1:]:
# DEBUG: print(f"DEBUG: line='{line}'")
- blocks.append({
+ blocklist.append({
"domain": tidyup_domain(line.find_all("td")[0].text),
"reason": tidyup_domain(line.find_all("td")[1].text)
})
- # DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
+ # DEBUG: print("DEBUG: Returning blocklist() for domain:", domain, len(blocklist))
return {
- "reject": blocks
+ "reject": blocklist
}
def fetch_misskey_blocks(domain: str) -> dict:
raise ValueError(f"Parameter 'domain' is empty")
# DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
- blocks = {
+ blocklist = {
"suspended": [],
"blocked" : []
}
count = 0
for instance in fetched:
# Is it there?
- if instance["isSuspended"] and not has_key(blocks["suspended"], "domain", instance):
+ if instance["isSuspended"] and not has_key(blocklist["suspended"], "domain", instance):
count = count + 1
- blocks["suspended"].append(
+ blocklist["suspended"].append(
{
"domain": tidyup_domain(instance["host"]),
# no reason field, nothing
count = 0
for instance in fetched:
# Is it there?
- if instance["isBlocked"] and not has_key(blocks["blocked"], "domain", instance):
+ if instance["isBlocked"] and not has_key(blocklist["blocked"], "domain", instance):
count = count + 1
- blocks["blocked"].append({
+ blocklist["blocked"].append({
"domain": tidyup_domain(instance["host"]),
"reason": None
})
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
instances.update_last_instance_fetch(domain)
- # DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
+ # DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocklist["blocked"]), len(blocklist["suspended"]))
return {
- "reject" : blocks["blocked"],
- "followers_only": blocks["suspended"]
+ "reject" : blocklist["blocked"],
+ "followers_only": blocklist["suspended"]
}
def tidyup_reason(reason: str) -> str:
raise ValueError(f"Parameter 'domain' is empty")
# DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
- blocks = {
+ blocklist = {
"Suspended servers": [],
"Filtered media" : [],
"Limited servers" : [],
else:
print(f"WARNING: header_text='{header_text}' not found in language mapping table")
- if header_text in blocks or header_text.lower() in blocks:
+ if header_text in blocklist or header_text.lower() in blocklist:
# replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
for line in header.find_all_next("table")[0].find_all("tr")[1:]:
- blocks[header_text].append(
+ blocklist[header_text].append(
{
"domain": fba.tidyup_domain(line.find("span").text),
"hash" : fba.tidyup_domain(line.find("span")["title"][9:]),
}
)
else:
- print(f"WARNING: header_text='{header_text}' not found in blocks()={len(blocks)}")
+ print(f"WARNING: header_text='{header_text}' not found in blocklist()={len(blocklist)}")
- # DEBUG: print("DEBUG: Returning blocks for domain:", domain)
+ # DEBUG: print("DEBUG: Returning blocklist for domain:", domain)
return {
- "reject" : blocks["Suspended servers"],
- "media_removal" : blocks["Filtered media"],
- "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
+ "reject" : blocklist["Suspended servers"],
+ "media_removal" : blocklist["Filtered media"],
+ "followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
}
def fetch_blocks(domain: str, origin: str, nodeinfo_url: str):
reqheaders = fba.api_headers
# DEBUG: print("DEBUG: Querying API domain_blocks:", domain)
- blocks = fba.get_response(domain, "/api/v1/instance/domain_blocks", reqheaders, (config.get("connection_timeout"), config.get("read_timeout"))).json()
+ blocklist = fba.get_response(domain, "/api/v1/instance/domain_blocks", reqheaders, (config.get("connection_timeout"), config.get("read_timeout"))).json()
- print(f"INFO: Checking {len(blocks)} entries from domain='{domain}',software='mastodon' ...")
- for block in blocks:
+ print(f"INFO: Checking {len(blocklist)} entries from domain='{domain}',software='mastodon' ...")
+ for block in blocklist:
entry = {
'domain': block['domain'],
'hash' : block['digest'],
json = fetch_blocks_from_about(domain)
print(f"INFO: Checking {len(json.items())} entries from domain='{domain}',software='mastodon' ...")
- for block_level, blocks in json.items():
- # DEBUG: print("DEBUG: domain,block_level,blocks():", domain, block_level, len(blocks))
+ for block_level, blocklist in json.items():
+ # DEBUG: print("DEBUG: domain,block_level,blocklist():", domain, block_level, len(blocklist))
block_level = fba.tidyup_domain(block_level)
# DEBUG: print("DEBUG: AFTER-block_level:", block_level)
print("WARNING: block_level is empty, domain:", domain)
continue
- # DEBUG: print(f"DEBUG: Checking {len(blocks)} entries from domain='{domain}',software='mastodon',block_level='{block_level}' ...")
- for block in blocks:
+ # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',software='mastodon',block_level='{block_level}' ...")
+ for block in blocklist:
blocked, blocked_hash, reason = block.values()
# DEBUG: print("DEBUG: blocked,hash,reason:", blocked, blocked_hash, reason)
blocked = fba.tidyup_domain(blocked)
})
else:
# DEBUG: print(f"DEBUG: Updating block last seen and reason for domain='{domain}',blocking='{blocking}' ...")
- fba.update_last_seen(domain, blocking, block_level)
+ blocks.update_last_seen(domain, blocking, block_level)
blocks.update_reason(reason, domain, blocking, block_level)
# DEBUG: print("DEBUG: Committing changes ...")
return
if "mrf_simple" in federation:
- for block_level, blocks in (
+ for block_level, blocklist in (
{**federation["mrf_simple"],
**{"quarantined_instances": federation["quarantined_instances"]}}
).items():
- # DEBUG: print("DEBUG: block_level, blocks():", block_level, len(blocks))
+ # DEBUG: print("DEBUG: block_level, blocklist():", block_level, len(blocklist))
block_level = fba.tidyup_domain(block_level)
# DEBUG: print("DEBUG: BEFORE block_level:", block_level)
print("WARNING: block_level is now empty!")
continue
- # DEBUG: print(f"DEBUG: Checking {len(blocks)} entries from domain='{domain}',block_level='{block_level}' ...")
- for blocked in blocks:
+ # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from domain='{domain}',block_level='{block_level}' ...")
+ for blocked in blocklist:
# DEBUG: print("DEBUG: BEFORE blocked:", blocked)
blocked = fba.tidyup_domain(blocked)
# DEBUG: print("DEBUG: AFTER blocked:", blocked)
})
else:
# DEBUG: print(f"DEBUG: Updating block last seen for domain='{domain}',blocked='{blocked}' ...")
- fba.update_last_seen(domain, blocked, block_level)
+ blocks.update_last_seen(domain, blocked, block_level)
# DEBUG: print("DEBUG: Committing changes ...")
fba.connection.commit()