1 from bs4 import BeautifulSoup
2 from hashlib import sha256
9 with open("config.json") as f:
10 config = json.loads(f.read())
13 "activitypub-troll.cf",
16 "social.shrimpcam.pw",
21 "user-agent": config["useragent"]
24 conn = sqlite3.connect("blocks.db")
27 def get_hash(domain: str) -> str:
28 return sha256(domain.encode("utf-8")).hexdigest()
30 def get_peers(domain: str) -> str:
32 res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
35 print("WARNING: Cannot fetch peers:", domain)
38 def get_type(instdomain: str) -> str:
40 res = reqto.get(f"https://{instdomain}/nodeinfo/2.1.json", headers=headers, timeout=5)
41 if res.status_code == 404:
42 res = reqto.get(f"https://{instdomain}/nodeinfo/2.0", headers=headers, timeout=5)
43 if res.status_code == 404:
44 res = reqto.get(f"https://{instdomain}/nodeinfo/2.0.json", headers=headers, timeout=5)
45 if res.ok and "text/html" in res.headers["content-type"]:
46 res = reqto.get(f"https://{instdomain}/nodeinfo/2.1", headers=headers, timeout=5)
48 if res.json()["software"]["name"] in ["akkoma", "rebased"]:
50 elif res.json()["software"]["name"] in ["hometown", "ecko"]:
52 elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
55 return res.json()["software"]["name"]
56 elif res.status_code == 404:
57 res = reqto.get(f"https://{instdomain}/api/v1/instance", headers=headers, timeout=5)
63 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
64 # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
67 "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
76 print("ERROR: failed SQL query")
79 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
80 # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
83 "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
92 print("ERROR: failed SQL query")
95 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
96 print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
99 "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
110 print("ERROR: failed SQL query")
113 def add_instance(domain: str):
114 print("--- Adding new instance:", domain)
117 "INSERT INTO instances SELECT ?, ?, ?",
125 print("ERROR: failed SQL query")
128 def send_bot_post(instance: str, blocks: dict):
129 message = instance + " has blocked the following instances:\n\n"
134 blocks = blocks[0 : 19]
137 if block["reason"] == None or block["reason"] == '':
138 message = message + block["blocked"] + " with unspecified reason\n"
140 if len(block["reason"]) > 420:
141 block["reason"] = block["reason"][0:419] + "[…]"
142 message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
144 message = message + "(the list has been truncated to the first 20 entries)"
146 botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
147 req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
148 data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
149 headers=botheaders, timeout=10).json()
152 def get_mastodon_blocks(domain: str) -> dict:
154 "Suspended servers": [],
155 "Filtered media": [],
156 "Limited servers": [],
157 "Silenced servers": [],
161 "Silenced instances": "Silenced servers",
162 "Suspended instances": "Suspended servers",
163 "Gesperrte Server": "Suspended servers",
164 "Gefilterte Medien": "Filtered media",
165 "Stummgeschaltete Server": "Silenced servers",
166 "停止済みのサーバー": "Suspended servers",
167 "メディアを拒否しているサーバー": "Filtered media",
168 "サイレンス済みのサーバー": "Silenced servers",
169 "שרתים מושעים": "Suspended servers",
170 "מדיה מסוננת": "Filtered media",
171 "שרתים מוגבלים": "Silenced servers",
172 "Serveurs suspendus": "Suspended servers",
173 "Médias filtrés": "Filtered media",
174 "Serveurs limités": "Silenced servers",
179 reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
183 print("ERROR: Cannot fetch from domain:", domain)
186 for header in doc.find_all("h3"):
187 header_text = header.text
188 if header_text in translations:
189 header_text = translations[header_text]
190 if header_text in blocks:
191 # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
192 for line in header.find_all_next("table")[0].find_all("tr")[1:]:
193 blocks[header_text].append(
195 "domain": line.find("span").text,
196 "hash": line.find("span")["title"][9:],
197 "reason": line.find_all("td")[1].text.strip(),
201 "reject": blocks["Suspended servers"],
202 "media_removal": blocks["Filtered media"],
203 "followers_only": blocks["Limited servers"]
204 + blocks["Silenced servers"],
207 def get_friendica_blocks(domain: str) -> dict:
212 reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
218 blocklist = doc.find(id="about_blocklist")
220 # Prevents exceptions:
221 if blocklist is None:
222 # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
225 for line in blocklist.find("table").find_all("tr")[1:]:
228 "domain": line.find_all("td")[0].text.strip(),
229 "reason": line.find_all("td")[1].text.strip()
237 def get_misskey_blocks(domain: str) -> dict:
247 # iterating through all "suspended" (follow-only in its terminology) instances page-by-page, since that troonware doesn't support sending them all at once
250 doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step}), headers=headers, timeout=5).json()
253 doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
257 if instance["isSuspended"]:
258 blocks["suspended"].append(
260 "domain": instance["host"],
261 # no reason field, nothing
265 counter = counter + step
271 # same shit, different asshole ("blocked" aka full suspend)
274 doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step}), headers=headers, timeout=5).json()
277 doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
280 if instance["isBlocked"]:
281 blocks["blocked"].append(
283 "domain": instance["host"],
287 counter = counter + step
293 "reject": blocks["blocked"],
294 "followers_only": blocks["suspended"]
300 def tidyup(domain: str) -> str:
301 # some retards put their blocks in variable case
302 domain = domain.lower()
304 # other retards put the port
305 domain = re.sub("\:\d+$", "", domain)
307 # bigger retards put the schema in their blocklist, sometimes even without slashes
308 domain = re.sub("^https?\:(\/*)", "", domain)
311 domain = re.sub("\/$", "", domain)
314 domain = re.sub("^\@", "", domain)
316 # the biggest retards of them all try to block individual users
317 domain = re.sub("(.+)\@", "", domain)