]> git.mxchange.org Git - fba.git/blob - fba.py
Continued:
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3
4 import reqto
5 import re
6 import sqlite3
7 import json
8 import sys
9 import time
10
11 with open("config.json") as f:
12     config = json.loads(f.read())
13
14 blacklist = [
15     "activitypub-troll.cf",
16     "gab.best",
17     "4chan.icu",
18     "social.shrimpcam.pw",
19     "mastotroll.netz.org",
20     "ngrok.io",
21 ]
22
23 headers = {
24     "user-agent": config["useragent"]
25 }
26
27 conn = sqlite3.connect("blocks.db")
28 c = conn.cursor()
29
30 def get_hash(domain: str) -> str:
31     # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
32     return sha256(domain.encode("utf-8")).hexdigest()
33
34 def update_last_blocked(domain: str):
35     # NOISY-DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
36
37     try:
38         c.execute("UPDATE instances SET last_blocked = ?, last_updated = ? WHERE domain = ?", [
39             time.time(),
40             time.time(),
41             domain
42         ])
43
44     except:
45         print("ERROR: failed SQL query:", domain)
46         sys.exit(255)
47
48 def update_last_error(domain: str, res: any):
49     # NOISY-DEBUG: print("DEBUG: domain,res.status_code", domain, res.status_code)
50
51     try:
52         c.execute("UPDATE instances SET last_status_code = ?, last_updated = ? WHERE domain = ?", [
53             res.status_code,
54             time.time(),
55             domain
56         ])
57
58     except:
59         print("ERROR: failed SQL query:", domain)
60         sys.exit(255)
61
62 def update_last_nodeinfo(domain: str):
63     # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
64
65     try:
66         c.execute("UPDATE instances SET last_nodeinfo = ?, last_updated = ? WHERE domain = ?", [
67             time.time(),
68             time.time(),
69             domain
70         ])
71
72     except:
73         print("ERROR: failed SQL query:", domain)
74         sys.exit(255)
75
76 def get_peers(domain: str) -> list:
77     # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
78     peers = None
79
80     try:
81         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=config["timeout"])
82
83         if not res.ok or res.status_code >= 400:
84             print("WARNING: Cannot fetch peers:", domain)
85             update_last_error(domain, res)
86         else:
87             # NOISY-DEBUG: print("DEBUG: Querying API was successful:", domain, len(res.json()))
88             peers = res.json()
89
90     except:
91         print("WARNING: Some error during get():", domain)
92
93     update_last_nodeinfo(domain)
94
95     # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
96     return peers
97
98 def post_json_api(domain: str, path: str, data: str) -> list:
99     try:
100         # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
101         res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=config["timeout"])
102
103         if not res.ok or res.status_code >= 400:
104             print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
105             update_last_error(domain, res)
106             raise
107
108         update_last_nodeinfo(domain)
109         json = res.json()
110     except:
111         print("WARNING: Some error during post():", domain, path, data)
112
113     # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
114     return json
115
116 def fetch_nodeinfo(domain: str) -> list:
117     # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
118
119     requests = [
120        f"https://{domain}/nodeinfo/2.1.json",
121        f"https://{domain}/nodeinfo/2.0",
122        f"https://{domain}/nodeinfo/2.0.json",
123        f"https://{domain}/nodeinfo/2.1",
124        f"https://{domain}/api/v1/instance"
125     ]
126
127     json = None
128     for request in requests:
129         # NOISY-DEBUG: print("DEBUG: Fetching request:", request)
130         res = reqto.get(request, headers=headers, timeout=config["timeout"])
131
132         # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
133         if res.ok and res.json() is not None:
134             # NOISY-DEBUG: print("DEBUG: Success:", request)
135             json = res.json()
136             break
137         elif not res.ok or res.status_code >= 400:
138             # NOISY-DEBUG: print("DEBUG: Failed fetching nodeinfo from domain:", domain)
139             update_last_error(domain, res)
140             continue
141
142     if json is None:
143         print("WARNING: Failed fetching nodeinfo from domain:", domain)
144
145     # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
146     update_last_nodeinfo(domain)
147
148     # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
149     return json
150
151 def determine_software(domain: str) -> str:
152     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
153     software = None
154
155     try:
156         json = fetch_nodeinfo(domain)
157         # NOISY-DEBUG: print("DEBUG: json():", len(json))
158
159         if json["software"]["name"] in ["akkoma", "rebased"]:
160             # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, json["software"]["name"])
161             software = "pleroma"
162         elif json["software"]["name"] in ["hometown", "ecko"]:
163             # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, json["software"]["name"])
164             software = "mastodon"
165         elif json["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
166             # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, json["software"]["name"])
167             software = "misskey"
168         else:
169             # NOISY-DEBUG: print("DEBUG: Using name:", domain, json["software"]["name"])
170             software = json["software"]["name"]
171
172     except:
173         print("WARNING: Could not determine software type:", domain)
174
175     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
176     return software
177
178 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
179     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
180     try:
181         c.execute(
182             "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
183             (
184                 reason,
185                 time.time(),
186                 blocker,
187                 blocked,
188                 block_level
189             ),
190         )
191
192     except:
193         print("ERROR: failed SQL query:", reason, blocker, blocked, block_level)
194         sys.exit(255)
195
196 def update_last_seen(blocker: str, blocked: str, block_level: str):
197     # NOISY: print("--- Updating last_seen for:", blocker, blocked, block_level)
198     try:
199         c.execute(
200             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
201             (
202                 time.time(),
203                 blocker,
204                 blocked,
205                 block_level
206             )
207         )
208
209     except:
210         print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level)
211         sys.exit(255)
212
213 def block_instance(blocker: str, blocked: str, reason: str, block_level: str):
214     # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
215     if blocker.find("@") > 0:
216         print("WARNING: Bad blocker:", blocker)
217         raise
218     elif blocked.find("@") > 0:
219         print("WARNING: Bad blocked:", blocked)
220         raise
221
222     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
223     try:
224         c.execute(
225             "INSERT INTO blocks (blocker, blocked, reason, block_level, first_added, last_seen) VALUES(?, ?, ?, ?, ?, ?)",
226              (
227                  blocker,
228                  blocked,
229                  reason,
230                  block_level,
231                  time.time(),
232                  time.time()
233              ),
234         )
235
236     except:
237         print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen)
238         sys.exit(255)
239
240 def add_instance(domain: str, origin: str, originator: str):
241     # NOISY-DEBUG: print("DEBUG: domain,origin:", domain, origin, originator)
242     if domain.find("@") > 0:
243         print("WARNING: Bad domain name:", domain)
244         raise
245     elif origin is not None and origin.find("@") > 0:
246         print("WARNING: Bad origin name:", origin)
247         raise
248
249     print(f"--- Adding new instance {domain} (origin: {origin})")
250     try:
251         c.execute(
252             "INSERT INTO instances (domain, origin, originator, hash, software, first_seen) VALUES (?, ?, ?, ?, ?, ?)",
253             (
254                domain,
255                origin,
256                originator,
257                get_hash(domain),
258                determine_software(domain),
259                time.time()
260             ),
261         )
262
263     except:
264         print("ERROR: failed SQL query:", domain)
265         sys.exit(255)
266
267 def send_bot_post(instance: str, blocks: dict):
268     message = instance + " has blocked the following instances:\n\n"
269     truncated = False
270
271     if len(blocks) > 20:
272         truncated = True
273         blocks = blocks[0 : 19]
274
275     for block in blocks:
276         if block["reason"] == None or block["reason"] == '':
277             message = message + block["blocked"] + " with unspecified reason\n"
278         else:
279             if len(block["reason"]) > 420:
280                 block["reason"] = block["reason"][0:419] + "[…]"
281
282             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
283
284     if truncated:
285         message = message + "(the list has been truncated to the first 20 entries)"
286
287     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
288
289     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
290         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
291         headers=botheaders, timeout=10).json()
292
293     return True
294
295 def get_mastodon_blocks(domain: str) -> dict:
296     # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
297     blocks = {
298         "Suspended servers": [],
299         "Filtered media": [],
300         "Limited servers": [],
301         "Silenced servers": [],
302     }
303
304     translations = {
305         "Silenced instances": "Silenced servers",
306         "Suspended instances": "Suspended servers",
307         "Gesperrte Server": "Suspended servers",
308         "Gefilterte Medien": "Filtered media",
309         "Stummgeschaltete Server": "Silenced servers",
310         "停止済みのサーバー": "Suspended servers",
311         "メディアを拒否しているサーバー": "Filtered media",
312         "サイレンス済みのサーバー": "Silenced servers",
313         "שרתים מושעים": "Suspended servers",
314         "מדיה מסוננת": "Filtered media",
315         "שרתים מוגבלים": "Silenced servers",
316         "Serveurs suspendus": "Suspended servers",
317         "Médias filtrés": "Filtered media",
318         "Serveurs limités": "Silenced servers",
319     }
320
321     try:
322         doc = BeautifulSoup(
323             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=config["timeout"]).text,
324             "html.parser",
325         )
326     except:
327         print("ERROR: Cannot fetch from domain:", domain)
328         return {}
329
330     for header in doc.find_all("h3"):
331         header_text = header.text
332
333         if header_text in translations:
334             header_text = translations[header_text]
335
336         if header_text in blocks:
337             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
338             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
339                 blocks[header_text].append(
340                     {
341                         "domain": line.find("span").text,
342                         "hash": line.find("span")["title"][9:],
343                         "reason": line.find_all("td")[1].text.strip(),
344                     }
345                 )
346
347     # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
348     return {
349         "reject": blocks["Suspended servers"],
350         "media_removal": blocks["Filtered media"],
351         "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
352     }
353
354 def get_friendica_blocks(domain: str) -> dict:
355     # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
356     blocks = []
357
358     try:
359         doc = BeautifulSoup(
360             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=config["timeout"]).text,
361             "html.parser",
362         )
363     except:
364         print("WARNING: Failed to fetch /friendica from domain:", domain)
365         return {}
366
367     blocklist = doc.find(id="about_blocklist")
368
369     # Prevents exceptions:
370     if blocklist is None:
371         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
372         return {}
373
374     for line in blocklist.find("table").find_all("tr")[1:]:
375         blocks.append({
376             "domain": line.find_all("td")[0].text.strip(),
377             "reason": line.find_all("td")[1].text.strip()
378         })
379
380     # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
381     return {
382         "reject": blocks
383     }
384
385 def get_misskey_blocks(domain: str) -> dict:
386     # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
387     blocks = {
388         "suspended": [],
389         "blocked": []
390     }
391
392     try:
393         counter = 0
394         step = 99
395         while True:
396             # iterating through all "suspended" (follow-only in its terminology)
397             # instances page-by-page, since that troonware doesn't support
398             # sending them all at once
399             try:
400                 if counter == 0:
401                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
402                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
403                         "sort": "+caughtAt",
404                         "host": None,
405                         "suspended": True,
406                         "limit": step
407                     }))
408                 else:
409                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
410                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
411                         "sort": "+caughtAt",
412                         "host": None,
413                         "suspended": True,
414                         "limit": step,
415                         "offset": counter-1
416                     }))
417
418                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
419                 if len(doc) == 0:
420                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
421                     break
422
423                 for instance in doc:
424                     # just in case
425                     if instance["isSuspended"]:
426                         blocks["suspended"].append(
427                             {
428                                 "domain": instance["host"],
429                                 # no reason field, nothing
430                                 "reason": ""
431                             }
432                         )
433
434                 if len(doc) < step:
435                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
436                     break
437
438                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
439                 counter = counter + step
440
441             except:
442                 print("WARNING: Caught error, exiting loop:", domain)
443                 counter = 0
444                 break
445
446         while True:
447             # same shit, different asshole ("blocked" aka full suspend)
448             try:
449                 if counter == 0:
450                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
451                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
452                         "sort": "+caughtAt",
453                         "host": None,
454                         "blocked": True,
455                         "limit": step
456                     }))
457                 else:
458                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
459                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
460                         "sort": "+caughtAt",
461                         "host": None,
462                         "blocked": True,
463                         "limit": step,
464                         "offset": counter-1
465                     }))
466
467                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
468                 if len(doc) == 0:
469                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
470                     break
471
472                 for instance in doc:
473                     if instance["isBlocked"]:
474                         blocks["blocked"].append({
475                             "domain": instance["host"],
476                             "reason": ""
477                         })
478
479                 if len(doc) < step:
480                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
481                     break
482
483                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
484                 counter = counter + step
485
486             except:
487                 counter = 0
488                 break
489
490         # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
491         return {
492             "reject": blocks["blocked"],
493             "followers_only": blocks["suspended"]
494         }
495
496     except:
497         print("WARNING: API request failed for domain:", domain)
498         return {}
499
500 def tidyup(domain: str) -> str:
501     # some retards put their blocks in variable case
502     domain = domain.lower()
503
504     # other retards put the port
505     domain = re.sub("\:\d+$", "", domain)
506
507     # bigger retards put the schema in their blocklist, sometimes even without slashes
508     domain = re.sub("^https?\:(\/*)", "", domain)
509
510     # and trailing slash
511     domain = re.sub("\/$", "", domain)
512
513     # and the @
514     domain = re.sub("^\@", "", domain)
515
516     # the biggest retards of them all try to block individual users
517     domain = re.sub("(.+)\@", "", domain)
518
519     return domain