- # NOISY-DEBUG: print("DEBUG: Returning json[]:", type(json))
- return json
-
-def fetch_generator_from_path(domain: str, path: str = "/") -> str:
- # NOISY-DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!")
- software = None
-
- try:
- # NOISY-DEBUG: print(f"DEBUG: Fetching '{path}' from '{domain}' ...")
- res = reqto.get(f"https://{domain}{path}", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
-
- # NOISY-DEBUG: print("DEBUG: domain,res.ok,res.status_code:", domain, res.ok, res.status_code)
- if res.ok and res.status_code < 300 and len(res.text) > 0:
- # NOISY-DEBUG: print("DEBUG: Search for <meta name='generator'>:", domain)
- doc = bs4.BeautifulSoup(res.text, "html.parser")
-
- # NOISY-DEBUG: print("DEBUG: doc[]:", type(doc))
- tag = doc.find("meta", {"name": "generator"})
-
- # NOISY-DEBUG: print(f"DEBUG: tag[{type(tag)}: {tag}")
- if isinstance(tag, bs4.element.Tag):
- # NOISY-DEBUG: print("DEBUG: Found generator meta tag: ", domain)
- software = tidyup(tag.get("content"))
- # NOISY-DEBUG: print(f"DEBUG: software='{software}'")
- remove_pending_error(domain)
-
- except BaseException as e:
- print(f"WARNING: Cannot fetch / from '{domain}':", e)
- update_last_error(domain, e)
- pass
-
- # NOISY-DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
- return software
-
-def determine_software(domain: str) -> str:
- # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
- software = None
-
- # NOISY-DEBUG: print(f"DEBUG: Fetching nodeinfo from '{domain}' ...")
- json = fetch_nodeinfo(domain)
-
- # NOISY-DEBUG: print("DEBUG: json[]:", type(json))
- if not isinstance(json, dict) or len(json) == 0:
- # NOISY-DEBUG: print("DEBUG: Could not determine software type:", domain)
- return fetch_generator_from_path(domain)
-
- # NOISY-DEBUG: print("DEBUG: json():", len(json), json)
- if "status" in json and json["status"] == "error" and "message" in json:
- print("WARNING: JSON response is an error:", json["message"])
- update_last_error(domain, json["message"])
- return fetch_generator_from_path(domain)
- elif "software" not in json or "name" not in json["software"]:
- # NOISY-DEBUG: print(f"DEBUG: JSON response from {domain} does not include [software][name], fetching / ...")
- software = fetch_generator_from_path(domain)
-
- # NOISY-DEBUG: print(f"DEBUG: Generator for domain='{domain}' is: {software}, EXIT!")
- return software
-
- software = tidyup(json["software"]["name"])
-
- # NOISY-DEBUG: print("DEBUG: sofware after tidyup():", software)
- if software in ["akkoma", "rebased"]:
- # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, software)
- software = "pleroma"
- elif software in ["hometown", "ecko"]:
- # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, software)
- software = "mastodon"
- elif software in ["calckey", "groundpolis", "foundkey", "cherrypick", "meisskey"]:
- # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, software)
- software = "misskey"
- elif software.find("/") > 0:
- print("WARNING: Spliting of slash:", software)
- software = software.split("/")[-1];
- elif software.find("|") > 0:
- print("WARNING: Spliting of pipe:", software)
- software = tidyup(software.split("|")[0]);
-
- # NOISY-DEBUG: print(f"DEBUG: software[]={type(software)}")
- if software == "":
- print("WARNING: tidyup() left no software name behind:", domain)
- software = None
-
- # NOISY-DEBUG: print(f"DEBUG: software[]={type(software)}")
- if str(software) == "":
- # NOISY-DEBUG: print(f"DEBUG: software for '{domain}' was not detected, trying generator ...")
- software = fetch_generator_from_path(domain)
-
- # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
- return software
-
-def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
- # NOISY-DEBUG: print("DEBUG: Updating block reason:", reason, blocker, blocked, block_level)
- try:
- cursor.execute(
- "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
- (
- reason,
- time.time(),
- blocker,
- blocked,
- block_level
- ),
- )
-
- # NOISY-DEBUG: print(f"DEBUG: cursor.rowcount={cursor.rowcount}")
- if cursor.rowcount == 0:
- print("WARNING: Did not update any rows:", domain)
-
- except BaseException as e:
- print("ERROR: failed SQL query:", reason, blocker, blocked, block_level, e)
- sys.exit(255)
-
- # NOISY-DEBUG: print("DEBUG: EXIT!")
-
-def update_last_seen(blocker: str, blocked: str, block_level: str):
- # NOISY-DEBUG: print("DEBUG: Updating last_seen for:", blocker, blocked, block_level)
- try:
- cursor.execute(
- "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
- (
- time.time(),
- blocker,
- blocked,
- block_level
- )
- )
-
- if cursor.rowcount == 0:
- print("WARNING: Did not update any rows:", domain)
-
- except BaseException as e:
- print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level, e)
- sys.exit(255)
-
- # NOISY-DEBUG: print("DEBUG: EXIT!")
-
-def block_instance(blocker: str, blocked: str, reason: str, block_level: str):
- # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
- if not validators.domain(blocker):
- print("WARNING: Bad blocker:", blocker)
- raise
- elif not validators.domain(blocked):
- print("WARNING: Bad blocked:", blocked)
- raise
-
- print("INFO: New block:", blocker, blocked, reason, block_level, first_added, last_seen)
- try:
- cursor.execute(
- "INSERT INTO blocks (blocker, blocked, reason, block_level, first_added, last_seen) VALUES(?, ?, ?, ?, ?, ?)",
- (
- blocker,
- blocked,
- reason,
- block_level,
- time.time(),
- time.time()
- ),
- )
-
- except BaseException as e:
- print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen, e)
- sys.exit(255)
-
- # NOISY-DEBUG: print("DEBUG: EXIT!")
-
-def add_instance(domain: str, origin: str, originator: str):
- # NOISY-DEBUG: print("DEBUG: domain,origin:", domain, origin, originator)
- if not validators.domain(domain):
- print("WARNING: Bad domain name:", domain)
- raise
- elif origin is not None and not validators.domain(origin):
- print("WARNING: Bad origin name:", origin)
- raise
-
- software = determine_software(domain)
- # NOISY-DEBUG: print("DEBUG: Determined software:", software)
-
- print(f"INFO: Adding new instance {domain} (origin: {origin})")
- try:
- cursor.execute(
- "INSERT INTO instances (domain, origin, originator, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
- (
- domain,
- origin,
- originator,
- get_hash(domain),
- software,
- time.time()
- ),
- )
-
- if domain in nodeinfos["nodeinfo_url"]:
- # NOISY-DEBUG # NOISY-DEBUG: print("DEBUG: domain has pending nodeinfo being updated:", domain)
- update_nodeinfos(domain)
- remove_pending_error(domain)
- elif domain in pending_errors:
- # NOISY-DEBUG: print("DEBUG: domain has pending error being updated:", domain)
- update_last_error(domain, pending_errors[domain])
- remove_pending_error(domain)
-
- except BaseException as e:
- print("ERROR: failed SQL query:", domain, e)
- sys.exit(255)
- else:
- # NOISY-DEBUG: print("DEBUG: Updating nodeinfo for domain:", domain)
- update_last_nodeinfo(domain)
-
- # NOISY-DEBUG: print("DEBUG: EXIT!")
-
-def send_bot_post(instance: str, blocks: dict):
- message = instance + " has blocked the following instances:\n\n"
- truncated = False
-
- if len(blocks) > 20:
- truncated = True
- blocks = blocks[0 : 19]
-
- for block in blocks:
- if block["reason"] == None or block["reason"] == '':
- message = message + block["blocked"] + " with unspecified reason\n"
- else:
- if len(block["reason"]) > 420:
- block["reason"] = block["reason"][0:419] + "[…]"
-
- message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
-
- if truncated:
- message = message + "(the list has been truncated to the first 20 entries)"
-
- botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
-
- req = reqto.post(
- f"{config['bot_instance']}/api/v1/statuses",
- data={
- "status" : message,
- "visibility" : config['bot_visibility'],
- "content_type": "text/plain"
- },
- headers=botheaders,
- timeout=10
- ).json()
-
- return True
-
-def get_mastodon_blocks(domain: str) -> dict:
- # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
- blocks = {
- "Suspended servers": [],
- "Filtered media" : [],
- "Limited servers" : [],
- "Silenced servers" : [],
- }
-
- try:
- doc = bs4.BeautifulSoup(
- reqto.get(f"https://{domain}/about/more", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
- "html.parser",
- )
- except BaseException as e:
- print("ERROR: Cannot fetch from domain:", domain, e)
- update_last_error(domain, e)
- return {}
-
- for header in doc.find_all("h3"):
- header_text = tidyup(header.text)
-
- if header_text in language_mapping:
- # NOISY-DEBUG: print(f"DEBUG: header_text='{header_text}'")
- header_text = language_mapping[header_text]
-
- if header_text in blocks or header_text.lower() in blocks:
- # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
- for line in header.find_all_next("table")[0].find_all("tr")[1:]:
- blocks[header_text].append(
- {
- "domain": tidyup(line.find("span").text),
- "hash" : tidyup(line.find("span")["title"][9:]),
- "reason": tidyup(line.find_all("td")[1].text),
- }
- )
-
- # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
- return {
- "reject" : blocks["Suspended servers"],
- "media_removal" : blocks["Filtered media"],
- "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
- }
-
-def get_friendica_blocks(domain: str) -> dict:
- # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
- blocks = []
-
- try:
- doc = bs4.BeautifulSoup(
- reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
- "html.parser",
- )
- except BaseException as e:
- print("WARNING: Failed to fetch /friendica from domain:", domain, e)
- update_last_error(domain, e)
- return {}
-
- blocklist = doc.find(id="about_blocklist")
-
- # Prevents exceptions:
- if blocklist is None:
- # NOISY-DEBUG: print("DEBUG:Instance has no block list:", domain)
- return {}
-
- for line in blocklist.find("table").find_all("tr")[1:]:
- blocks.append({
- "domain": tidyup(line.find_all("td")[0].text),
- "reason": tidyup(line.find_all("td")[1].text)
- })
-
- # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
- return {
- "reject": blocks
- }
-
-def get_misskey_blocks(domain: str) -> dict:
- # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
- blocks = {
- "suspended": [],
- "blocked" : []
- }
-
- counter = 0
- step = 99
- while True:
- # iterating through all "suspended" (follow-only in its terminology)
- # instances page-by-page, since that troonware doesn't support
- # sending them all at once
- try:
- if counter == 0:
- # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
- doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
- "sort" : "+caughtAt",
- "host" : None,
- "suspended": True,
- "limit" : step
- }))
- else:
- # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
- doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
- "sort" : "+caughtAt",
- "host" : None,
- "suspended": True,
- "limit" : step,
- "offset" : counter-1
- }))
-
- # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
- if len(doc) == 0:
- # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
- break
-
- for instance in doc:
- # just in case
- if instance["isSuspended"]:
- blocks["suspended"].append(
- {
- "domain": tidyup(instance["host"]),
- # no reason field, nothing
- "reason": ""
- }
- )
-
- if len(doc) < step:
- # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
- break
-
- # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
- counter = counter + step
-
- except BaseException as e:
- print("WARNING: Caught error, exiting loop:", domain, e)
- update_last_error(domain, e)
- counter = 0
- break
-
- while True:
- # same shit, different asshole ("blocked" aka full suspend)
- try:
- if counter == 0:
- # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
- doc = post_json_api(domain,"/api/federation/instances", json.dumps({
- "sort" : "+caughtAt",
- "host" : None,
- "blocked": True,
- "limit" : step
- }))
- else:
- # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
- doc = post_json_api(domain,"/api/federation/instances", json.dumps({
- "sort" : "+caughtAt",
- "host" : None,
- "blocked": True,
- "limit" : step,
- "offset" : counter-1
- }))
-
- # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
- if len(doc) == 0:
- # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
- break
-
- for instance in doc:
- if instance["isBlocked"]:
- blocks["blocked"].append({
- "domain": tidyup(instance["host"]),
- "reason": ""
- })
-
- if len(doc) < step:
- # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
- break
-
- # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
- counter = counter + step
-
- except BaseException as e:
- print("ERROR: Exception during POST:", domain, e)
- update_last_error(domain, e)
- counter = 0
- break
-
- # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
- return {
- "reject" : blocks["blocked"],
- "followers_only": blocks["suspended"]
- }
-
-def tidyup(string: str) -> str:
- # some retards put their blocks in variable case
- string = string.lower().strip()
-
- # other retards put the port
- string = re.sub("\:\d+$", "", string)
-
- # bigger retards put the schema in their blocklist, sometimes even without slashes
- string = re.sub("^https?\:(\/*)", "", string)