]> git.mxchange.org Git - fba.git/blob - fba.py
3136ea235d64fb157b3c3e1b80d9148c6a80f38d
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3
4 import reqto
5 import re
6 import sqlite3
7 import json
8 import sys
9 import time
10
11 with open("config.json") as f:
12     config = json.loads(f.read())
13
14 blacklist = [
15     "activitypub-troll.cf",
16     "gab.best",
17     "4chan.icu",
18     "social.shrimpcam.pw",
19     "mastotroll.netz.org",
20     "ngrok.io",
21 ]
22
23 headers = {
24     "user-agent": config["useragent"]
25 }
26
27 conn = sqlite3.connect("blocks.db")
28 c = conn.cursor()
29
30 def get_hash(domain: str) -> str:
31     # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
32     return sha256(domain.encode("utf-8")).hexdigest()
33
34 def update_last_blocked(domain: str):
35     # NOISY-DEBUG: print("DEBUG: Updating last_blocked for domain", domain)
36
37     try:
38         c.execute("UPDATE instances SET last_blocked = ? WHERE domain = ?", [
39             time.time(),
40             domain
41         ])
42
43     except:
44         print("ERROR: failed SQL query:", domain)
45         sys.exit(255)
46
47 def update_last_error(domain: str, res: any):
48     # NOISY-DEBUG: print("DEBUG: domain,res.status_code", domain, res.status_code)
49
50     try:
51         c.execute("UPDATE instances SET last_status_code = ? WHERE domain = ?", [
52             res.status_code,
53             domain
54         ])
55
56     except:
57         print("ERROR: failed SQL query:", domain)
58         sys.exit(255)
59
60 def update_last_nodeinfo(domain: str):
61     # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
62
63     try:
64         c.execute("UPDATE instances SET last_nodeinfo = ? WHERE domain = ?", [
65             time.time(),
66             domain
67         ])
68
69     except:
70         print("ERROR: failed SQL query:", domain)
71         sys.exit(255)
72
73 def get_peers(domain: str) -> list:
74     # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
75     peers = None
76
77     try:
78         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=config["timeout"])
79
80         if not res.ok or res.status_code >= 400:
81             print("WARNING: Cannot fetch peers:", domain)
82             update_last_error(domain, res)
83         else:
84             # NOISY-DEBUG: print("DEBUG: Querying API was successful:", domain, len(res.json()))
85             peers = res.json()
86
87     except:
88         print("WARNING: Some error during get():", domain)
89
90     update_last_nodeinfo(domain)
91
92     # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
93     return peers
94
95 def post_json_api(domain: str, path: str, data: str) -> list:
96     try:
97         # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
98         res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=config["timeout"])
99
100         if not res.ok or res.status_code >= 400:
101             print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
102             update_last_error(domain, res)
103             raise
104
105         update_last_nodeinfo(domain)
106         json = res.json()
107     except:
108         print("WARNING: Some error during post():", domain, path, data)
109
110     # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
111     return json
112
113 def fetch_nodeinfo(domain: str) -> list:
114     # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
115
116     requests = [
117        f"https://{domain}/nodeinfo/2.1.json",
118        f"https://{domain}/nodeinfo/2.0",
119        f"https://{domain}/nodeinfo/2.0.json",
120        f"https://{domain}/nodeinfo/2.1",
121        f"https://{domain}/api/v1/instance"
122     ]
123
124     json = None
125     for request in requests:
126         # NOISY-DEBUG: print("DEBUG: Fetching request:", request)
127         res = reqto.get(request, headers=headers, timeout=config["timeout"])
128
129         # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
130         if res.ok and res.json() is not None:
131             # NOISY-DEBUG: print("DEBUG: Success:", request)
132             json = res.json()
133             break
134         elif not res.ok or res.status_code >= 400:
135             # NOISY-DEBUG: print("DEBUG: Failed fetching nodeinfo from domain:", domain)
136             update_last_error(domain, res)
137             continue
138
139     if json is None:
140         print("WARNING: Failed fetching nodeinfo from domain:", domain)
141
142     # NOISY-DEBUG: print("DEBUG: Updating last_nodeinfo for domain:", domain)
143     update_last_nodeinfo(domain)
144
145     # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
146     return json
147
148 def determine_software(domain: str) -> str:
149     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
150     software = None
151
152     try:
153         json = fetch_nodeinfo(domain)
154         # NOISY-DEBUG: print("DEBUG: json():", len(json))
155
156         if json["software"]["name"] in ["akkoma", "rebased"]:
157             # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, json["software"]["name"])
158             software = "pleroma"
159         elif json["software"]["name"] in ["hometown", "ecko"]:
160             # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, json["software"]["name"])
161             software = "mastodon"
162         elif json["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
163             # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, json["software"]["name"])
164             software = "misskey"
165         else:
166             # NOISY-DEBUG: print("DEBUG: Using name:", domain, json["software"]["name"])
167             software = json["software"]["name"]
168
169     except:
170         print("WARNING: Could not determine software type:", domain)
171
172     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
173     return software
174
175 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
176     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
177     try:
178         c.execute(
179             "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
180             (
181                 reason,
182                 blocker,
183                 blocked,
184                 block_level
185             ),
186         )
187
188     except:
189         print("ERROR: failed SQL query:", reason, blocker, blocked, block_level)
190         sys.exit(255)
191
192 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
193     # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
194     try:
195         c.execute(
196             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
197             (
198                 last_seen,
199                 blocker,
200                 blocked,
201                 block_level
202             )
203         )
204
205     except:
206         print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level)
207         sys.exit(255)
208
209 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
210     # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level,first_added,last_seen:", blocker, blocked, reason, block_level, first_added, last_seen)
211     if blocker.find("@") > 0:
212         print("WARNING: Bad blocker:", blocker)
213         raise
214     elif blocked.find("@") > 0:
215         print("WARNING: Bad blocked:", blocked)
216         raise
217
218     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
219     try:
220         c.execute(
221             "INSERT INTO blocks (blocker, blocked, reason, block_level, first_added, last_seen) VALUES(?, ?, ?, ?, ?, ?)",
222              (
223                  blocker,
224                  blocked,
225                  reason,
226                  block_level,
227                  first_added,
228                  last_seen
229              ),
230         )
231
232     except:
233         print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen)
234         sys.exit(255)
235
236 def add_instance(domain: str, origin: str, originator: str):
237     # NOISY-DEBUG: print("DEBUG: domain,origin:", domain, origin, originator)
238     if domain.find("@") > 0:
239         print("WARNING: Bad domain name:", domain)
240         raise
241     elif origin is not None and origin.find("@") > 0:
242         print("WARNING: Bad origin name:", origin)
243         raise
244
245     print(f"--- Adding new instance {domain} (origin: {origin})")
246     try:
247         c.execute(
248             "INSERT INTO instances (domain,origin,originator,hash,software) VALUES (?, ?, ?, ?, ?)",
249             (
250                domain,
251                origin,
252                originator,
253                get_hash(domain),
254                determine_software(domain)
255             ),
256         )
257
258     except:
259         print("ERROR: failed SQL query:", domain)
260         sys.exit(255)
261
262 def send_bot_post(instance: str, blocks: dict):
263     message = instance + " has blocked the following instances:\n\n"
264     truncated = False
265
266     if len(blocks) > 20:
267         truncated = True
268         blocks = blocks[0 : 19]
269
270     for block in blocks:
271         if block["reason"] == None or block["reason"] == '':
272             message = message + block["blocked"] + " with unspecified reason\n"
273         else:
274             if len(block["reason"]) > 420:
275                 block["reason"] = block["reason"][0:419] + "[…]"
276
277             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
278
279     if truncated:
280         message = message + "(the list has been truncated to the first 20 entries)"
281
282     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
283
284     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
285         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
286         headers=botheaders, timeout=10).json()
287
288     return True
289
290 def get_mastodon_blocks(domain: str) -> dict:
291     # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
292     blocks = {
293         "Suspended servers": [],
294         "Filtered media": [],
295         "Limited servers": [],
296         "Silenced servers": [],
297     }
298
299     translations = {
300         "Silenced instances": "Silenced servers",
301         "Suspended instances": "Suspended servers",
302         "Gesperrte Server": "Suspended servers",
303         "Gefilterte Medien": "Filtered media",
304         "Stummgeschaltete Server": "Silenced servers",
305         "停止済みのサーバー": "Suspended servers",
306         "メディアを拒否しているサーバー": "Filtered media",
307         "サイレンス済みのサーバー": "Silenced servers",
308         "שרתים מושעים": "Suspended servers",
309         "מדיה מסוננת": "Filtered media",
310         "שרתים מוגבלים": "Silenced servers",
311         "Serveurs suspendus": "Suspended servers",
312         "Médias filtrés": "Filtered media",
313         "Serveurs limités": "Silenced servers",
314     }
315
316     try:
317         doc = BeautifulSoup(
318             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=config["timeout"]).text,
319             "html.parser",
320         )
321     except:
322         print("ERROR: Cannot fetch from domain:", domain)
323         return {}
324
325     for header in doc.find_all("h3"):
326         header_text = header.text
327
328         if header_text in translations:
329             header_text = translations[header_text]
330
331         if header_text in blocks:
332             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
333             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
334                 blocks[header_text].append(
335                     {
336                         "domain": line.find("span").text,
337                         "hash": line.find("span")["title"][9:],
338                         "reason": line.find_all("td")[1].text.strip(),
339                     }
340                 )
341
342     # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
343     return {
344         "reject": blocks["Suspended servers"],
345         "media_removal": blocks["Filtered media"],
346         "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
347     }
348
349 def get_friendica_blocks(domain: str) -> dict:
350     # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
351     blocks = []
352
353     try:
354         doc = BeautifulSoup(
355             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=config["timeout"]).text,
356             "html.parser",
357         )
358     except:
359         print("WARNING: Failed to fetch /friendica from domain:", domain)
360         return {}
361
362     blocklist = doc.find(id="about_blocklist")
363
364     # Prevents exceptions:
365     if blocklist is None:
366         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
367         return {}
368
369     for line in blocklist.find("table").find_all("tr")[1:]:
370         blocks.append({
371             "domain": line.find_all("td")[0].text.strip(),
372             "reason": line.find_all("td")[1].text.strip()
373         })
374
375     # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
376     return {
377         "reject": blocks
378     }
379
380 def get_misskey_blocks(domain: str) -> dict:
381     # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
382     blocks = {
383         "suspended": [],
384         "blocked": []
385     }
386
387     try:
388         counter = 0
389         step = 99
390         while True:
391             # iterating through all "suspended" (follow-only in its terminology)
392             # instances page-by-page, since that troonware doesn't support
393             # sending them all at once
394             try:
395                 if counter == 0:
396                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
397                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
398                         "sort": "+caughtAt",
399                         "host": None,
400                         "suspended": True,
401                         "limit": step
402                     }))
403                 else:
404                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
405                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
406                         "sort": "+caughtAt",
407                         "host": None,
408                         "suspended": True,
409                         "limit": step,
410                         "offset": counter-1
411                     }))
412
413                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
414                 if len(doc) == 0:
415                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
416                     break
417
418                 for instance in doc:
419                     # just in case
420                     if instance["isSuspended"]:
421                         blocks["suspended"].append(
422                             {
423                                 "domain": instance["host"],
424                                 # no reason field, nothing
425                                 "reason": ""
426                             }
427                         )
428
429                 if len(doc) < step:
430                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
431                     break
432
433                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
434                 counter = counter + step
435
436             except:
437                 print("WARNING: Caught error, exiting loop:", domain)
438                 counter = 0
439                 break
440
441         while True:
442             # same shit, different asshole ("blocked" aka full suspend)
443             try:
444                 if counter == 0:
445                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
446                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
447                         "sort": "+caughtAt",
448                         "host": None,
449                         "blocked": True,
450                         "limit": step
451                     }))
452                 else:
453                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
454                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
455                         "sort": "+caughtAt",
456                         "host": None,
457                         "blocked": True,
458                         "limit": step,
459                         "offset": counter-1
460                     }))
461
462                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
463                 if len(doc) == 0:
464                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
465                     break
466
467                 for instance in doc:
468                     if instance["isBlocked"]:
469                         blocks["blocked"].append({
470                             "domain": instance["host"],
471                             "reason": ""
472                         })
473
474                 if len(doc) < step:
475                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
476                     break
477
478                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
479                 counter = counter + step
480
481             except:
482                 counter = 0
483                 break
484
485         # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
486         return {
487             "reject": blocks["blocked"],
488             "followers_only": blocks["suspended"]
489         }
490
491     except:
492         print("WARNING: API request failed for domain:", domain)
493         return {}
494
495 def tidyup(domain: str) -> str:
496     # some retards put their blocks in variable case
497     domain = domain.lower()
498
499     # other retards put the port
500     domain = re.sub("\:\d+$", "", domain)
501
502     # bigger retards put the schema in their blocklist, sometimes even without slashes
503     domain = re.sub("^https?\:(\/*)", "", domain)
504
505     # and trailing slash
506     domain = re.sub("\/$", "", domain)
507
508     # and the @
509     domain = re.sub("^\@", "", domain)
510
511     # the biggest retards of them all try to block individual users
512     domain = re.sub("(.+)\@", "", domain)
513
514     return domain