]> git.mxchange.org Git - fba.git/blob - fba.py
Continued:
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3
4 import reqto
5 import re
6 import sqlite3
7 import json
8 import sys
9 import time
10
11 with open("config.json") as f:
12     config = json.loads(f.read())
13
14 blacklist = [
15     "activitypub-troll.cf",
16     "gab.best",
17     "4chan.icu",
18     "social.shrimpcam.pw",
19     "mastotroll.netz.org",
20     "ngrok.io",
21 ]
22
23 headers = {
24     "user-agent": config["useragent"]
25 }
26
27 conn = sqlite3.connect("blocks.db")
28 c = conn.cursor()
29
30 def get_hash(domain: str) -> str:
31     # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
32     return sha256(domain.encode("utf-8")).hexdigest()
33
34 def update_last_error(domain: str, res: any):
35     # NOISY-DEBUG: print("DEBUG: domain,res.status_code", domain, res.status_code)
36
37     try:
38         c.execute("UPDATE instances SET last_status_code = ? WHERE domain = ?", [
39             res.status_code,
40             domain
41         ])
42
43     except:
44         print("ERROR: failed SQL query:", domain)
45         sys.exit(255)
46
47 def update_last_nodeinfo(domain: str):
48     # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
49
50     try:
51         c.execute("UPDATE instances SET last_nodeinfo = ? WHERE domain = ?", [
52             time.time(),
53             domain
54         ])
55
56     except:
57         print("ERROR: failed SQL query:", domain)
58         sys.exit(255)
59
60 def get_peers(domain: str) -> list:
61     # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
62     peers = None
63
64     try:
65         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
66
67         if not res.ok or res.status_code >= 400:
68             print("WARNING: Cannot fetch peers:", domain)
69             update_last_error(domain, res)
70         else:
71             # NOISY-DEBUG: print("DEBUG: Querying API was successful:", domain, len(res.json()))
72             peers = res.json()
73
74     except:
75         print("WARNING: Some error during get():", domain)
76
77     update_last_nodeinfo(domain)
78
79     # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
80     return peers
81
82 def post_json_api(domain: str, path: str, data: str) -> list:
83     try:
84         # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
85         res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=5)
86
87         if not res.ok or res.status_code >= 400:
88             print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
89             update_last_error(domain, res)
90             raise
91
92         update_last_nodeinfo(domain)
93         json = res.json()
94     except:
95         print("WARNING: Some error during post():", domain, path, data)
96
97     # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
98     return json
99
100 def fetch_nodeinfo(domain: str) -> list:
101     # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
102
103     requests = [
104        f"https://{domain}/nodeinfo/2.1.json",
105        f"https://{domain}/nodeinfo/2.0",
106        f"https://{domain}/nodeinfo/2.0.json",
107        f"https://{domain}/nodeinfo/2.1",
108        f"https://{domain}/api/v1/instance"
109     ]
110
111     json = None
112     for request in requests:
113         # NOISY-DEBUG: print("DEBUG: Fetching request:", request)
114         res = reqto.get(request, headers=headers, timeout=5)
115
116         # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
117         if res.ok and res.json() is not None:
118             # NOISY-DEBUG: print("DEBUG: Success:", request)
119             json = res.json()
120             break
121         elif not res.ok or res.status_code >= 400:
122             # NOISY-DEBUG: print("DEBUG: Failed fetching nodeinfo from domain:", domain)
123             update_last_error(domain, res)
124             continue
125
126     if json is None:
127         print("WARNING: Failed fetching nodeinfo from domain:", domain)
128
129     # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
130     update_last_nodeinfo(domain)
131
132     # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
133     return json
134
135 def determine_software(domain: str) -> str:
136     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
137     software = None
138
139     try:
140         json = fetch_nodeinfo(domain)
141         # NOISY-DEBUG: print("DEBUG: json():", len(json))
142
143         if json["software"]["name"] in ["akkoma", "rebased"]:
144             # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, json["software"]["name"])
145             software = "pleroma"
146         elif json["software"]["name"] in ["hometown", "ecko"]:
147             # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, json["software"]["name"])
148             software = "mastodon"
149         elif json["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
150             # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, json["software"]["name"])
151             software = "misskey"
152         else:
153             # NOISY-DEBUG: print("DEBUG: Using name:", domain, json["software"]["name"])
154             software = json["software"]["name"]
155
156     except:
157         print("WARNING: Could not determine software type:", domain)
158
159     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
160     return software
161
162 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
163     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
164     try:
165         c.execute(
166             "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
167             (
168                 reason,
169                 blocker,
170                 blocked,
171                 block_level
172             ),
173         )
174
175     except:
176         print("ERROR: failed SQL query:", reason, blocker, blocked, block_level)
177         sys.exit(255)
178
179 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
180     # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
181     try:
182         c.execute(
183             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
184             (
185                 last_seen,
186                 blocker,
187                 blocked,
188                 block_level
189             )
190         )
191
192     except:
193         print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level)
194         sys.exit(255)
195
196 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
197     # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level,first_added,last_seen:", blocker, blocked, reason, block_level, first_added, last_seen)
198     if blocker.find("@") > 0:
199         print("WARNING: Bad blocker:", blocker)
200         raise
201     elif blocked.find("@") > 0:
202         print("WARNING: Bad blocked:", blocked)
203         raise
204
205     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
206     try:
207         c.execute(
208             "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
209              (
210                  blocker,
211                  blocked,
212                  reason,
213                  block_level,
214                  first_added,
215                  last_seen
216              ),
217         )
218
219     except:
220         print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen)
221         sys.exit(255)
222
223 def add_instance(domain: str):
224     # NOISY-DEBUG: print("DEBUG: domain:", domain)
225     if domain.find("@") > 0:
226         print("WARNING: Bad domain name:", domain)
227         raise
228
229     print("--- Adding new instance:", domain)
230     try:
231         c.execute(
232             "INSERT INTO instances (domain,hash,software) VALUES (?, ?, ?)",
233             (
234                domain,
235                get_hash(domain),
236                determine_software(domain)
237             ),
238         )
239
240     except:
241         print("ERROR: failed SQL query:", domain)
242         sys.exit(255)
243
244 def send_bot_post(instance: str, blocks: dict):
245     message = instance + " has blocked the following instances:\n\n"
246     truncated = False
247
248     if len(blocks) > 20:
249         truncated = True
250         blocks = blocks[0 : 19]
251
252     for block in blocks:
253         if block["reason"] == None or block["reason"] == '':
254             message = message + block["blocked"] + " with unspecified reason\n"
255         else:
256             if len(block["reason"]) > 420:
257                 block["reason"] = block["reason"][0:419] + "[…]"
258
259             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
260
261     if truncated:
262         message = message + "(the list has been truncated to the first 20 entries)"
263
264     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
265
266     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
267         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
268         headers=botheaders, timeout=10).json()
269
270     return True
271
272 def get_mastodon_blocks(domain: str) -> dict:
273     # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
274     blocks = {
275         "Suspended servers": [],
276         "Filtered media": [],
277         "Limited servers": [],
278         "Silenced servers": [],
279     }
280
281     translations = {
282         "Silenced instances": "Silenced servers",
283         "Suspended instances": "Suspended servers",
284         "Gesperrte Server": "Suspended servers",
285         "Gefilterte Medien": "Filtered media",
286         "Stummgeschaltete Server": "Silenced servers",
287         "停止済みのサーバー": "Suspended servers",
288         "メディアを拒否しているサーバー": "Filtered media",
289         "サイレンス済みのサーバー": "Silenced servers",
290         "שרתים מושעים": "Suspended servers",
291         "מדיה מסוננת": "Filtered media",
292         "שרתים מוגבלים": "Silenced servers",
293         "Serveurs suspendus": "Suspended servers",
294         "Médias filtrés": "Filtered media",
295         "Serveurs limités": "Silenced servers",
296     }
297
298     try:
299         doc = BeautifulSoup(
300             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
301             "html.parser",
302         )
303     except:
304         print("ERROR: Cannot fetch from domain:", domain)
305         return {}
306
307     for header in doc.find_all("h3"):
308         header_text = header.text
309
310         if header_text in translations:
311             header_text = translations[header_text]
312
313         if header_text in blocks:
314             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
315             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
316                 blocks[header_text].append(
317                     {
318                         "domain": line.find("span").text,
319                         "hash": line.find("span")["title"][9:],
320                         "reason": line.find_all("td")[1].text.strip(),
321                     }
322                 )
323
324     # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
325     return {
326         "reject": blocks["Suspended servers"],
327         "media_removal": blocks["Filtered media"],
328         "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
329     }
330
331 def get_friendica_blocks(domain: str) -> dict:
332     # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
333     blocks = []
334
335     try:
336         doc = BeautifulSoup(
337             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
338             "html.parser",
339         )
340     except:
341         print("WARNING: Failed to fetch /friendica from domain:", domain)
342         return {}
343
344     blocklist = doc.find(id="about_blocklist")
345
346     # Prevents exceptions:
347     if blocklist is None:
348         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
349         return {}
350
351     for line in blocklist.find("table").find_all("tr")[1:]:
352         blocks.append({
353             "domain": line.find_all("td")[0].text.strip(),
354             "reason": line.find_all("td")[1].text.strip()
355         })
356
357     # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
358     return {
359         "reject": blocks
360     }
361
362 def get_misskey_blocks(domain: str) -> dict:
363     # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
364     blocks = {
365         "suspended": [],
366         "blocked": []
367     }
368
369     try:
370         counter = 0
371         step = 99
372         while True:
373             # iterating through all "suspended" (follow-only in its terminology)
374             # instances page-by-page, since that troonware doesn't support
375             # sending them all at once
376             try:
377                 if counter == 0:
378                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
379                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
380                         "sort": "+caughtAt",
381                         "host": None,
382                         "suspended": True,
383                         "limit": step
384                     }))
385                 else:
386                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
387                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
388                         "sort": "+caughtAt",
389                         "host": None,
390                         "suspended": True,
391                         "limit": step,
392                         "offset": counter-1
393                     }))
394
395                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
396                 if len(doc) == 0:
397                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
398                     break
399
400                 for instance in doc:
401                     # just in case
402                     if instance["isSuspended"]:
403                         blocks["suspended"].append(
404                             {
405                                 "domain": instance["host"],
406                                 # no reason field, nothing
407                                 "reason": ""
408                             }
409                         )
410
411                 if len(doc) < step:
412                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
413                     break
414
415                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
416                 counter = counter + step
417
418             except:
419                 print("WARNING: Caught error, exiting loop:", domain)
420                 counter = 0
421                 break
422
423         while True:
424             # same shit, different asshole ("blocked" aka full suspend)
425             try:
426                 if counter == 0:
427                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
428                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
429                         "sort": "+caughtAt",
430                         "host": None,
431                         "blocked": True,
432                         "limit": step
433                     }))
434                 else:
435                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
436                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
437                         "sort": "+caughtAt",
438                         "host": None,
439                         "blocked": True,
440                         "limit": step,
441                         "offset": counter-1
442                     }))
443
444                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
445                 if len(doc) == 0:
446                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
447                     break
448
449                 for instance in doc:
450                     if instance["isBlocked"]:
451                         blocks["blocked"].append({
452                             "domain": instance["host"],
453                             "reason": ""
454                         })
455
456                 if len(doc) < step:
457                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
458                     break
459
460                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
461                 counter = counter + step
462
463             except:
464                 counter = 0
465                 break
466
467         # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
468         return {
469             "reject": blocks["blocked"],
470             "followers_only": blocks["suspended"]
471         }
472
473     except:
474         print("WARNING: API request failed for domain:", domain)
475         return {}
476
477 def tidyup(domain: str) -> str:
478     # some retards put their blocks in variable case
479     domain = domain.lower()
480
481     # other retards put the port
482     domain = re.sub("\:\d+$", "", domain)
483
484     # bigger retards put the schema in their blocklist, sometimes even without slashes
485     domain = re.sub("^https?\:(\/*)", "", domain)
486
487     # and trailing slash
488     domain = re.sub("\/$", "", domain)
489
490     # and the @
491     domain = re.sub("^\@", "", domain)
492
493     # the biggest retards of them all try to block individual users
494     domain = re.sub("(.+)\@", "", domain)
495
496     return domain