1 from bs4 import BeautifulSoup
2 from hashlib import sha256
11 with open("config.json") as f:
12 config = json.loads(f.read())
15 "activitypub-troll.cf",
18 "social.shrimpcam.pw",
19 "mastotroll.netz.org",
24 "user-agent": config["useragent"]
27 connection = sqlite3.connect("blocks.db")
28 cursor = connection.cursor()
30 def is_blacklisted(domain: str) -> bool:
31 # NOISY-DEBUG: print("DEBUG: Checking blacklist for domain:", domain)
33 for peer in blacklist:
34 # NOISY-DEBUG: print("DEBUG: domain,peer:", domain, peer)
38 # NOISY-DEBUG: print("DEBUG: blacklisted:", blacklisted)
41 def get_hash(domain: str) -> str:
42 # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
43 return sha256(domain.encode("utf-8")).hexdigest()
45 def update_last_blocked(domain: str):
46 # NOISY-DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
49 cursor.execute("UPDATE instances SET last_blocked = ?, last_updated = ? WHERE domain = ?", [
56 print("ERROR: failed SQL query:", domain)
59 def update_last_error(domain: str, res: any):
60 # NOISY-DEBUG: print("DEBUG: domain,res.status_code:", domain, res.status_code, res.reason)
62 cursor.execute("UPDATE instances SET last_status_code = ?, last_error_details = ?, last_updated = ? WHERE domain = ?", [
70 print("ERROR: failed SQL query:", domain)
73 def update_last_nodeinfo(domain: str):
74 # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
76 cursor.execute("UPDATE instances SET last_nodeinfo = ?, last_updated = ? WHERE domain = ?", [
83 print("ERROR: failed SQL query:", domain)
88 def get_peers(domain: str) -> list:
89 # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
93 res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
95 if not res.ok or res.status_code >= 400:
96 print("WARNING: Cannot fetch peers:", domain)
97 update_last_error(domain, res)
99 # NOISY-DEBUG: print("DEBUG: Querying API was successful:", domain, len(res.json()))
103 print("WARNING: Some error during get():", domain)
105 update_last_nodeinfo(domain)
107 # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
110 def post_json_api(domain: str, path: str, data: str) -> list:
111 # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
113 res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
115 if not res.ok or res.status_code >= 400:
116 print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
117 update_last_error(domain, res)
120 update_last_nodeinfo(domain)
123 print("WARNING: Some error during post():", domain, path, data)
125 # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
128 def fetch_nodeinfo(domain: str) -> list:
129 # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
132 f"https://{domain}/nodeinfo/2.1.json",
133 f"https://{domain}/nodeinfo/2.1",
134 f"https://{domain}/nodeinfo/2.0.json",
135 f"https://{domain}/nodeinfo/2.0",
136 f"https://{domain}/nodeinfo/1.0",
137 f"https://{domain}/api/v1/instance"
141 for request in requests:
142 # NOISY-DEBUG: print("DEBUG: Fetching request:", request)
143 res = reqto.get(request, headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
145 # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
146 if res.ok and res.json() is not None:
147 # NOISY-DEBUG: print("DEBUG: Success:", request)
150 elif not res.ok or res.status_code >= 400:
151 # NOISY-DEBUG: print("DEBUG: Failed fetching nodeinfo from domain:", domain)
152 update_last_error(domain, res)
156 print("WARNING: Failed fetching nodeinfo from domain:", domain)
158 # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
159 update_last_nodeinfo(domain)
161 # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
164 def determine_software(domain: str) -> str:
165 # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
169 json = fetch_nodeinfo(domain)
171 # NOISY-DEBUG: print("DEBUG: json():", len(json))
172 software = tidyup(json["software"]["name"])
174 # NOISY-DEBUG: print("DEBUG: BEFORE software:", software)
175 if software in ["akkoma", "rebased"]:
176 # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, software)
178 elif software in ["hometown", "ecko"]:
179 # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, software)
180 software = "mastodon"
181 elif software in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
182 # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, software)
184 elif software.find("/") > 0:
185 print("WARNING: Spliting of path:", software)
186 software = software.split("/")[-1];
189 print("WARNING: tidyup() left no software name behind:", domain)
191 # NOISY-DEBUG: print("DEBUG: AFTER software:", software)
193 print("WARNING: Could not determine software type:", domain)
195 # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
198 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
199 # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
202 "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
213 print("ERROR: failed SQL query:", reason, blocker, blocked, block_level)
216 def update_last_seen(blocker: str, blocked: str, block_level: str):
217 # NOISY: print("--- Updating last_seen for:", blocker, blocked, block_level)
220 "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
230 print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level)
233 def block_instance(blocker: str, blocked: str, reason: str, block_level: str):
234 # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
235 if blocker.find("@") > 0:
236 print("WARNING: Bad blocker:", blocker)
238 elif blocked.find("@") > 0:
239 print("WARNING: Bad blocked:", blocked)
242 print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
245 "INSERT INTO blocks (blocker, blocked, reason, block_level, first_added, last_seen) VALUES(?, ?, ?, ?, ?, ?)",
257 print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen)
260 def add_instance(domain: str, origin: str, originator: str):
261 # NOISY-DEBUG: print("DEBUG: domain,origin:", domain, origin, originator)
262 if domain.find("@") > 0:
263 print("WARNING: Bad domain name:", domain)
265 elif origin is not None and origin.find("@") > 0:
266 print("WARNING: Bad origin name:", origin)
269 print(f"--- Adding new instance {domain} (origin: {origin})")
272 "INSERT INTO instances (domain, origin, originator, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
278 determine_software(domain),
284 print("ERROR: failed SQL query:", domain)
287 def send_bot_post(instance: str, blocks: dict):
288 message = instance + " has blocked the following instances:\n\n"
293 blocks = blocks[0 : 19]
296 if block["reason"] == None or block["reason"] == '':
297 message = message + block["blocked"] + " with unspecified reason\n"
299 if len(block["reason"]) > 420:
300 block["reason"] = block["reason"][0:419] + "[…]"
302 message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
305 message = message + "(the list has been truncated to the first 20 entries)"
307 botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
309 req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
310 data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
311 headers=botheaders, timeout=10).json()
315 def get_mastodon_blocks(domain: str) -> dict:
316 # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
318 "Suspended servers": [],
319 "Filtered media": [],
320 "Limited servers": [],
321 "Silenced servers": [],
325 "Silenced instances": "Silenced servers",
326 "Suspended instances": "Suspended servers",
327 "Gesperrte Server": "Suspended servers",
328 "Gefilterte Medien": "Filtered media",
329 "Stummgeschaltete Server": "Silenced servers",
330 "停止済みのサーバー": "Suspended servers",
331 "メディアを拒否しているサーバー": "Filtered media",
332 "サイレンス済みのサーバー": "Silenced servers",
333 "שרתים מושעים": "Suspended servers",
334 "מדיה מסוננת": "Filtered media",
335 "שרתים מוגבלים": "Silenced servers",
336 "Serveurs suspendus": "Suspended servers",
337 "Médias filtrés": "Filtered media",
338 "Serveurs limités": "Silenced servers",
343 reqto.get(f"https://{domain}/about/more", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
347 print("ERROR: Cannot fetch from domain:", domain)
350 for header in doc.find_all("h3"):
351 header_text = header.text
353 if header_text in translations:
354 header_text = translations[header_text]
356 if header_text in blocks:
357 # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
358 for line in header.find_all_next("table")[0].find_all("tr")[1:]:
359 blocks[header_text].append(
361 "domain": tidyup(line.find("span").text),
362 "hash" : tidyup(line.find("span")["title"][9:]),
363 "reason": tidyup(line.find_all("td")[1].text),
367 # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
369 "reject" : blocks["Suspended servers"],
370 "media_removal" : blocks["Filtered media"],
371 "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
374 def get_friendica_blocks(domain: str) -> dict:
375 # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
380 reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
384 print("WARNING: Failed to fetch /friendica from domain:", domain)
387 blocklist = doc.find(id="about_blocklist")
389 # Prevents exceptions:
390 if blocklist is None:
391 # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
394 for line in blocklist.find("table").find_all("tr")[1:]:
396 "domain": tidyup(line.find_all("td")[0].text),
397 "reason": tidyup(line.find_all("td")[1].text)
400 # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
405 def get_misskey_blocks(domain: str) -> dict:
406 # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
416 # iterating through all "suspended" (follow-only in its terminology)
417 # instances page-by-page, since that troonware doesn't support
418 # sending them all at once
421 # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
422 doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
423 "sort" : "+caughtAt",
429 # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
430 doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
431 "sort" : "+caughtAt",
438 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
440 # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
445 if instance["isSuspended"]:
446 blocks["suspended"].append(
448 "domain": tidyup(instance["host"]),
449 # no reason field, nothing
455 # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
458 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
459 counter = counter + step
462 print("WARNING: Caught error, exiting loop:", domain)
467 # same shit, different asshole ("blocked" aka full suspend)
470 # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
471 doc = post_json_api(domain,"/api/federation/instances", json.dumps({
472 "sort" : "+caughtAt",
478 # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
479 doc = post_json_api(domain,"/api/federation/instances", json.dumps({
480 "sort" : "+caughtAt",
487 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
489 # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
493 if instance["isBlocked"]:
494 blocks["blocked"].append({
495 "domain": tidyup(instance["host"]),
500 # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
503 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
504 counter = counter + step
510 # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
512 "reject" : blocks["blocked"],
513 "followers_only": blocks["suspended"]
517 print("WARNING: API request failed for domain:", domain)
520 def tidyup(string: str) -> str:
521 # NOISY-DEBUG: print("DEBUG: BEFORE string:", string)
523 # some retards put their blocks in variable case
524 string = string.lower().strip()
526 # other retards put the port
527 string = re.sub("\:\d+$", "", string)
529 # bigger retards put the schema in their blocklist, sometimes even without slashes
530 string = re.sub("^https?\:(\/*)", "", string)
533 string = re.sub("\/$", "", string)
536 string = re.sub("^\@", "", string)
538 # the biggest retards of them all try to block individual users
539 string = re.sub("(.+)\@", "", string)
541 # NOISY-DEBUG: print("DEBUG: AFTER string:", string)