1 from bs4 import BeautifulSoup
2 from hashlib import sha256
11 with open("config.json") as f:
12 config = json.loads(f.read())
15 "activitypub-troll.cf",
18 "social.shrimpcam.pw",
19 "mastotroll.netz.org",
24 "user-agent": config["useragent"]
27 connection = sqlite3.connect("blocks.db")
28 cursor = connection.cursor()
30 def is_blacklisted(domain: str) -> bool:
32 for peer in blacklist:
38 def get_hash(domain: str) -> str:
39 return sha256(domain.encode("utf-8")).hexdigest()
41 def update_last_blocked(domain: str):
42 # NOISY-DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
44 cursor.execute("UPDATE instances SET last_blocked = ?, last_updated = ? WHERE domain = ?", [
51 print("ERROR: failed SQL query:", domain)
54 def update_last_error(domain: str, res: any):
55 # NOISY-DEBUG: print("DEBUG: domain,res.status_code:", domain, res.status_code, res.reason)
57 cursor.execute("UPDATE instances SET last_status_code = ?, last_error_details = ?, last_updated = ? WHERE domain = ?", [
65 print("ERROR: failed SQL query:", domain)
68 def update_last_nodeinfo(domain: str):
69 # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
71 cursor.execute("UPDATE instances SET last_nodeinfo = ?, last_updated = ? WHERE domain = ?", [
78 print("ERROR: failed SQL query:", domain)
83 def get_peers(domain: str) -> list:
84 # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
88 res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
90 if not res.ok or res.status_code >= 400:
91 print("WARNING: Cannot fetch peers:", domain)
92 update_last_error(domain, res)
94 # NOISY-DEBUG: print("DEBUG: Querying API was successful:", domain, len(res.json()))
98 print("WARNING: Some error during get():", domain)
100 update_last_nodeinfo(domain)
102 # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
105 def post_json_api(domain: str, path: str, data: str) -> list:
106 # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
109 res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
111 if not res.ok or res.status_code >= 400:
112 print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
113 update_last_error(domain, res)
116 update_last_nodeinfo(domain)
119 print("WARNING: Some error during post():", domain, path, data)
121 # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
124 def fetch_nodeinfo(domain: str) -> list:
125 # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
128 f"https://{domain}/nodeinfo/2.1.json",
129 f"https://{domain}/nodeinfo/2.1",
130 f"https://{domain}/nodeinfo/2.0.json",
131 f"https://{domain}/nodeinfo/2.0",
132 f"https://{domain}/nodeinfo/1.0",
133 f"https://{domain}/api/v1/instance"
137 for request in requests:
139 # NOISY-DEBUG: print("DEBUG: Fetching request:", request)
140 res = reqto.get(request, headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
142 # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
143 if res.ok and res.json() is not None:
144 # NOISY-DEBUG: print("DEBUG: Success:", request)
147 elif not res.ok or res.status_code >= 400:
148 # NOISY-DEBUG: print("DEBUG: Failed fetching nodeinfo from domain:", domain)
149 update_last_error(domain, res)
153 # NOISY-DEBUG: print("DEBUG: Cannot fetch API request:", request)
156 # NOISY-DEBUG: print("DEBUG: json[]:", type(json))
157 if json is None or len(json) == 0:
158 print("WARNING: Failed fetching nodeinfo from domain:", domain)
159 json = fetch_wellknown_nodeinfo(domain)
161 # NOISY-DEBUG: print("DEBUG: Returning json[]:", type(json))
164 def fetch_wellknown_nodeinfo(domain: str) -> list:
165 # NOISY-DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain)
169 res = reqto.get(f"https://{domain}/.well-known/nodeinfo", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"]))
170 # NOISY-DEBUG: print("DEBUG: domain,res.ok:", domain, res.ok)
171 if res.ok and res.json() is not None:
172 nodeinfo = res.json()
173 # NOISY-DEBUG: print("DEBUG: Found entries:", len(nodeinfo), domain)
174 if "links" in nodeinfo:
175 # NOISY-DEBUG: print("DEBUG: Found links in nodeinfo():", len(nodeinfo["links"]))
176 for link in nodeinfo["links"]:
177 # NOISY-DEBUG: print("DEBUG: rel,href:", link["rel"], link["href"])
178 if link["rel"] == "http://nodeinfo.diaspora.software/ns/schema/2.0":
179 # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from:", link["href"])
180 res = reqto.get(link["href"])
181 # NOISY-DEBUG: print("DEBUG: href,res.ok:", link["href"], res.ok)
182 if res.ok and res.json() is not None:
183 # NOISY-DEBUG: print("DEBUG: Found JSON nodeinfo():", len(res.json()))
188 print("WARNING: Failed fetching .well-known info:", domain)
191 # NOISY-DEBUG: print("DEBUG: Returning json[]:", type(json))
194 def determine_software(domain: str) -> str:
195 # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
198 json = fetch_nodeinfo(domain)
199 # NOISY-DEBUG: print("DEBUG: json[]:", type(json))
201 if json is None or len(json) == 0:
202 # NOISY-DEBUG: print("DEBUG: Could not determine software type:", domain)
205 # NOISY-DEBUG: print("DEBUG: json():", len(json), json)
206 if "software" not in json or "name" not in json["software"]:
207 print("WARNING: JSON response does not include [software][name], guessing ...")
209 for element in {"uri", "title", "description", "email", "version", "urls", "stats", "thumbnail", "languages", "contact_account"}:
213 # NOISY-DEBUG: print("DEBUG: Found elements:", found)
214 if found == len(json):
215 # NOISY-DEBUG: print("DEBUG: Maybe is Mastodon:", domain)
218 print("WARNING: Cannot guess software type:", domain, found, len(json))
221 software = tidyup(json["software"]["name"])
223 # NOISY-DEBUG: print("DEBUG: tidyup software:", software)
224 if software in ["akkoma", "rebased"]:
225 # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, software)
227 elif software in ["hometown", "ecko"]:
228 # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, software)
229 software = "mastodon"
230 elif software in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
231 # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, software)
233 elif software.find("/") > 0:
234 print("WARNING: Spliting of path:", software)
235 software = software.split("/")[-1];
238 print("WARNING: tidyup() left no software name behind:", domain)
241 # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
244 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
245 # NOISY: # NOISY-DEBUG: print("DEBUG: Updating block reason:", reason, blocker, blocked, block_level)
248 "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
259 print("ERROR: failed SQL query:", reason, blocker, blocked, block_level)
262 def update_last_seen(blocker: str, blocked: str, block_level: str):
263 # NOISY: # NOISY-DEBUG: print("DEBUG: Updating last_seen for:", blocker, blocked, block_level)
266 "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
276 print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level)
279 def block_instance(blocker: str, blocked: str, reason: str, block_level: str):
280 # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
281 if blocker.find("@") > 0:
282 print("WARNING: Bad blocker:", blocker)
284 elif blocked.find("@") > 0:
285 print("WARNING: Bad blocked:", blocked)
288 print("INFO: New block:", blocker, blocked, reason, block_level, first_added, last_seen)
291 "INSERT INTO blocks (blocker, blocked, reason, block_level, first_added, last_seen) VALUES(?, ?, ?, ?, ?, ?)",
303 print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen)
306 def add_instance(domain: str, origin: str, originator: str):
307 # NOISY-DEBUG: print("DEBUG: domain,origin:", domain, origin, originator)
308 if domain.find("@") > 0:
309 print("WARNING: Bad domain name:", domain)
311 elif origin is not None and origin.find("@") > 0:
312 print("WARNING: Bad origin name:", origin)
315 software = determine_software(domain)
316 # NOISY-DEBUG: print("DEBUG: Determined software:", software)
318 print(f"INFO: Adding new instance {domain} (origin: {origin})")
321 "INSERT INTO instances (domain, origin, originator, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
333 print("ERROR: failed SQL query:", domain)
336 # NOISY-DEBUG: print("DEBUG: Updating nodeinfo for domain:", domain)
337 update_last_nodeinfo(domain)
339 def send_bot_post(instance: str, blocks: dict):
340 message = instance + " has blocked the following instances:\n\n"
345 blocks = blocks[0 : 19]
348 if block["reason"] == None or block["reason"] == '':
349 message = message + block["blocked"] + " with unspecified reason\n"
351 if len(block["reason"]) > 420:
352 block["reason"] = block["reason"][0:419] + "[…]"
354 message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
357 message = message + "(the list has been truncated to the first 20 entries)"
359 botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
361 req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
362 data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
363 headers=botheaders, timeout=10).json()
367 def get_mastodon_blocks(domain: str) -> dict:
368 # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
370 "Suspended servers": [],
371 "Filtered media" : [],
372 "Limited servers" : [],
373 "Silenced servers" : [],
377 "Silenced instances": "Silenced servers",
378 "Suspended instances": "Suspended servers",
379 "Gesperrte Server": "Suspended servers",
380 "Gefilterte Medien": "Filtered media",
381 "Stummgeschaltete Server": "Silenced servers",
382 "停止済みのサーバー": "Suspended servers",
383 "メディアを拒否しているサーバー": "Filtered media",
384 "サイレンス済みのサーバー": "Silenced servers",
385 "שרתים מושעים": "Suspended servers",
386 "מדיה מסוננת": "Filtered media",
387 "שרתים מוגבלים": "Silenced servers",
388 "Serveurs suspendus": "Suspended servers",
389 "Médias filtrés": "Filtered media",
390 "Serveurs limités": "Silenced servers",
395 reqto.get(f"https://{domain}/about/more", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
399 print("ERROR: Cannot fetch from domain:", domain)
402 for header in doc.find_all("h3"):
403 header_text = header.text
405 if header_text in translations:
406 header_text = translations[header_text]
408 if header_text in blocks:
409 # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
410 for line in header.find_all_next("table")[0].find_all("tr")[1:]:
411 blocks[header_text].append(
413 "domain": tidyup(line.find("span").text),
414 "hash" : tidyup(line.find("span")["title"][9:]),
415 "reason": tidyup(line.find_all("td")[1].text),
419 # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
421 "reject" : blocks["Suspended servers"],
422 "media_removal" : blocks["Filtered media"],
423 "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
426 def get_friendica_blocks(domain: str) -> dict:
427 # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
432 reqto.get(f"https://{domain}/friendica", headers=headers, timeout=(config["connection_timeout"], config["read_timeout"])).text,
436 print("WARNING: Failed to fetch /friendica from domain:", domain)
439 blocklist = doc.find(id="about_blocklist")
441 # Prevents exceptions:
442 if blocklist is None:
443 # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
446 for line in blocklist.find("table").find_all("tr")[1:]:
448 "domain": tidyup(line.find_all("td")[0].text),
449 "reason": tidyup(line.find_all("td")[1].text)
452 # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
457 def get_misskey_blocks(domain: str) -> dict:
458 # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
468 # iterating through all "suspended" (follow-only in its terminology)
469 # instances page-by-page, since that troonware doesn't support
470 # sending them all at once
473 # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
474 doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
475 "sort" : "+caughtAt",
481 # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
482 doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
483 "sort" : "+caughtAt",
490 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
492 # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
497 if instance["isSuspended"]:
498 blocks["suspended"].append(
500 "domain": tidyup(instance["host"]),
501 # no reason field, nothing
507 # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
510 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
511 counter = counter + step
514 print("WARNING: Caught error, exiting loop:", domain)
519 # same shit, different asshole ("blocked" aka full suspend)
522 # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
523 doc = post_json_api(domain,"/api/federation/instances", json.dumps({
524 "sort" : "+caughtAt",
530 # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
531 doc = post_json_api(domain,"/api/federation/instances", json.dumps({
532 "sort" : "+caughtAt",
539 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
541 # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
545 if instance["isBlocked"]:
546 blocks["blocked"].append({
547 "domain": tidyup(instance["host"]),
552 # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
555 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
556 counter = counter + step
562 # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
564 "reject" : blocks["blocked"],
565 "followers_only": blocks["suspended"]
569 print("WARNING: API request failed for domain:", domain)
572 def tidyup(string: str) -> str:
573 # some retards put their blocks in variable case
574 string = string.lower().strip()
576 # other retards put the port
577 string = re.sub("\:\d+$", "", string)
579 # bigger retards put the schema in their blocklist, sometimes even without slashes
580 string = re.sub("^https?\:(\/*)", "", string)
583 string = re.sub("\/$", "", string)
586 string = re.sub("^\@", "", string)
588 # the biggest retards of them all try to block individual users
589 string = re.sub("(.+)\@", "", string)