-with open("config.json") as f:
- config = loads(f.read())
-
-headers = {
- "user-agent": config["useragent"]
-}
-
-def send_bot_post(instance: str, blocks: dict):
- message = instance + " has blocked the following instances:\n\n"
- truncated = False
- if len(blocks) > 20:
- truncated = True
- blocks = blocks[0 : 19]
- for block in blocks:
- if block["reason"] == None or block["reason"] == '':
- message = message + block["blocked"] + " with unspecified reason\n"
- else:
- message = message + block["blocked"] + ' for "' + block["reason"] + '"\n'
- if truncated:
- message = message + "(the list has been truncated to the first 20 entries)"
-
- botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
- req = post(f"{config['bot_instance']}/api/v1/statuses",
- data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
- headers=botheaders, timeout=10).json()
- print(req)
- return True
-
-def get_mastodon_blocks(domain: str) -> dict:
- blocks = {
- "Suspended servers": [],
- "Filtered media": [],
- "Limited servers": [],
- "Silenced servers": [],
- }
-
- translations = {
- "Silenced instances": "Silenced servers",
- "Suspended instances": "Suspended servers",
- "Gesperrte Server": "Suspended servers",
- "Gefilterte Medien": "Filtered media",
- "Stummgeschaltete Server": "Silenced servers",
- "停止済みのサーバー": "Suspended servers",
- "メディアを拒否しているサーバー": "Filtered media",
- "サイレンス済みのサーバー": "Silenced servers",
- "שרתים מושעים": "Suspended servers",
- "מדיה מסוננת": "Filtered media",
- "שרתים מוגבלים": "Silenced servers",
- "Serveurs suspendus": "Suspended servers",
- "Médias filtrés": "Filtered media",
- "Serveurs limités": "Silenced servers",
- }
-
- try:
- doc = BeautifulSoup(
- get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
- "html.parser",
- )
- except:
- return {}
-
- for header in doc.find_all("h3"):
- header_text = header.text
- if header_text in translations:
- header_text = translations[header_text]
- if header_text in blocks:
- # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
- for line in header.find_all_next("table")[0].find_all("tr")[1:]:
- blocks[header_text].append(
- {
- "domain": line.find("span").text,
- "hash": line.find("span")["title"][9:],
- "reason": line.find_all("td")[1].text.strip(),
- }
- )
- return {
- "reject": blocks["Suspended servers"],
- "media_removal": blocks["Filtered media"],
- "followers_only": blocks["Limited servers"]
- + blocks["Silenced servers"],
- }
-
-def get_friendica_blocks(domain: str) -> dict:
- blocks = []
-
- try:
- doc = BeautifulSoup(
- get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
- "html.parser",
- )
- except:
- return {}
-
- blocklist = doc.find(id="about_blocklist")
- for line in blocklist.find("table").find_all("tr")[1:]:
- blocks.append(
- {
- "domain": line.find_all("td")[0].text.strip(),
- "reason": line.find_all("td")[1].text.strip()
- }
- )
-
- return {
- "reject": blocks
- }
-
-def get_pisskey_blocks(domain: str) -> dict:
- blocks = {
- "suspended": [],
- "blocked": []
- }
-
- try:
- counter = 0
- step = 99
- while True:
- # iterating through all "suspended" (follow-only in its terminology) instances page-by-page, since that troonware doesn't support sending them all at once
- try:
- if counter == 0:
- doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step}), headers=headers, timeout=5).json()
- if doc == []: raise
- else:
- doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
- if doc == []: raise
- for instance in doc:
- # just in case
- if instance["isSuspended"]:
- blocks["suspended"].append(
- {
- "domain": instance["host"],
- # no reason field, nothing
- "reason": ""
- }
- )
- counter = counter + step
- except:
- counter = 0
- break
-
- while True:
- # same shit, different asshole ("blocked" aka full suspend)
- try:
- if counter == 0:
- doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step}), headers=headers, timeout=5).json()
- if doc == []: raise
- else:
- doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
- if doc == []: raise
- for instance in doc:
- if instance["isBlocked"]:
- blocks["blocked"].append(
- {
- "domain": instance["host"],
- "reason": ""
- }
- )
- counter = counter + step
- except:
- counter = 0
- break
-
- return {
- "reject": blocks["blocked"],
- "followers_only": blocks["suspended"]
- }
-
- except:
- return {}
-
-def get_hash(domain: str) -> str:
- return sha256(domain.encode("utf-8")).hexdigest()
-
-
-def get_type(domain: str) -> str:
- try:
- res = get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
- if res.status_code == 404:
- res = get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
- if res.status_code == 404:
- res = get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
- if res.ok and "text/html" in res.headers["content-type"]:
- res = get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
- if res.ok:
- if res.json()["software"]["name"] in ["akkoma", "rebased"]:
- return "pleroma"
- elif res.json()["software"]["name"] in ["hometown", "ecko"]:
- return "mastodon"
- elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
- return "misskey"
- else:
- return res.json()["software"]["name"]
- elif res.status_code == 404:
- res = get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
- if res.ok:
- return "mastodon"
- except:
- return None
-
-def tidyup(domain: str) -> str:
- # some retards put their blocks in variable case
- domain = domain.lower()
- # other retards put the port
- domain = re.sub("\:\d+$", "", domain)
- # bigger retards put the schema in their blocklist, sometimes even without slashes
- domain = re.sub("^https?\:(\/*)", "", domain)
- # and trailing slash
- domain = re.sub("\/$", "", domain)
- # and the @
- domain = re.sub("^\@", "", domain)
- # the biggest retards of them all try to block individual users
- domain = re.sub("(.+)\@", "", domain)
- return domain
-
-conn = sqlite3.connect("blocks.db")
-c = conn.cursor()
-
-c.execute(
- #"select domain, software from instances where software in ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial')"
- "select domain, software from instances where domain = 'mstdn.social'"