1 from bs4 import BeautifulSoup
2 from hashlib import sha256
10 with open("config.json") as f:
11 config = json.loads(f.read())
14 "activitypub-troll.cf",
17 "social.shrimpcam.pw",
22 "user-agent": config["useragent"]
25 conn = sqlite3.connect("blocks.db")
28 def get_hash(domain: str) -> str:
29 # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
30 return sha256(domain.encode("utf-8")).hexdigest()
32 def get_peers(domain: str) -> str:
33 # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
37 res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
40 print("WARNING: Cannot fetch peers:", domain, res.status_code)
42 # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
45 def post_json_api(domain: str, path: str, data: str) -> str:
46 # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data():", domain, path, len(data))
47 doc = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=5).json()
50 print("WARNING: Cannot query JSON API:", domain, path)
53 # NOISY-DEBUG: print("DEBUG: Returning doc():", len(doc))
56 def determine_software(domain: str) -> str:
57 # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
60 res = reqto.get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
61 if res.status_code == 404:
62 res = reqto.get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
63 if res.status_code == 404:
64 res = reqto.get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
65 if res.ok and "text/html" in res.headers["content-type"]:
66 res = reqto.get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
68 if res.json()["software"]["name"] in ["akkoma", "rebased"]:
70 elif res.json()["software"]["name"] in ["hometown", "ecko"]:
72 elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
75 software = res.json()["software"]["name"]
76 elif res.status_code == 404:
77 res = reqto.get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
81 print("WARNING: Failed fetching instance meta data")
83 # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
86 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
87 # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
90 "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
100 print("ERROR: failed SQL query")
103 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
104 # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
107 "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
117 print("ERROR: failed SQL query")
120 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
121 print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
124 "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
136 print("ERROR: failed SQL query")
139 def add_instance(domain: str):
140 print("--- Adding new instance:", domain)
143 "INSERT INTO instances SELECT ?, ?, ?",
147 determine_software(domain)
152 print("ERROR: failed SQL query")
155 def send_bot_post(instance: str, blocks: dict):
156 message = instance + " has blocked the following instances:\n\n"
161 blocks = blocks[0 : 19]
164 if block["reason"] == None or block["reason"] == '':
165 message = message + block["blocked"] + " with unspecified reason\n"
167 if len(block["reason"]) > 420:
168 block["reason"] = block["reason"][0:419] + "[…]"
170 message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
173 message = message + "(the list has been truncated to the first 20 entries)"
175 botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
177 req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
178 data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
179 headers=botheaders, timeout=10).json()
183 def get_mastodon_blocks(domain: str) -> dict:
184 # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
186 "Suspended servers": [],
187 "Filtered media": [],
188 "Limited servers": [],
189 "Silenced servers": [],
193 "Silenced instances": "Silenced servers",
194 "Suspended instances": "Suspended servers",
195 "Gesperrte Server": "Suspended servers",
196 "Gefilterte Medien": "Filtered media",
197 "Stummgeschaltete Server": "Silenced servers",
198 "停止済みのサーバー": "Suspended servers",
199 "メディアを拒否しているサーバー": "Filtered media",
200 "サイレンス済みのサーバー": "Silenced servers",
201 "שרתים מושעים": "Suspended servers",
202 "מדיה מסוננת": "Filtered media",
203 "שרתים מוגבלים": "Silenced servers",
204 "Serveurs suspendus": "Suspended servers",
205 "Médias filtrés": "Filtered media",
206 "Serveurs limités": "Silenced servers",
211 reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
215 print("ERROR: Cannot fetch from domain:", domain)
218 for header in doc.find_all("h3"):
219 header_text = header.text
221 if header_text in translations:
222 header_text = translations[header_text]
224 if header_text in blocks:
225 # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
226 for line in header.find_all_next("table")[0].find_all("tr")[1:]:
227 blocks[header_text].append(
229 "domain": line.find("span").text,
230 "hash": line.find("span")["title"][9:],
231 "reason": line.find_all("td")[1].text.strip(),
235 # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
237 "reject": blocks["Suspended servers"],
238 "media_removal": blocks["Filtered media"],
239 "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
242 def get_friendica_blocks(domain: str) -> dict:
243 # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
248 reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
252 print("WARNING: Failed to fetch /friendica from domain:", domain)
255 blocklist = doc.find(id="about_blocklist")
257 # Prevents exceptions:
258 if blocklist is None:
259 # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
262 for line in blocklist.find("table").find_all("tr")[1:]:
264 "domain": line.find_all("td")[0].text.strip(),
265 "reason": line.find_all("td")[1].text.strip()
268 # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
273 def get_misskey_blocks(domain: str) -> dict:
274 # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
284 # iterating through all "suspended" (follow-only in its terminology)
285 # instances page-by-page, since that troonware doesn't support
286 # sending them all at once
289 doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
296 doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
306 if instance["isSuspended"]:
307 blocks["suspended"].append(
309 "domain": instance["host"],
310 # no reason field, nothing
314 counter = counter + step
320 # same shit, different asshole ("blocked" aka full suspend)
323 doc = post_json_api(domain,"/api/federation/instances", json.dumps({
330 doc = post_json_api(domain,"/api/federation/instances", json.dumps({
339 if instance["isBlocked"]:
340 blocks["blocked"].append({
341 "domain": instance["host"],
344 counter = counter + step
350 # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
352 "reject": blocks["blocked"],
353 "followers_only": blocks["suspended"]
357 print("WARNING: API request failed for domain:", domain)
360 def tidyup(domain: str) -> str:
361 # some retards put their blocks in variable case
362 domain = domain.lower()
364 # other retards put the port
365 domain = re.sub("\:\d+$", "", domain)
367 # bigger retards put the schema in their blocklist, sometimes even without slashes
368 domain = re.sub("^https?\:(\/*)", "", domain)
371 domain = re.sub("\/$", "", domain)
374 domain = re.sub("^\@", "", domain)
376 # the biggest retards of them all try to block individual users
377 domain = re.sub("(.+)\@", "", domain)