]> git.mxchange.org Git - fba.git/blob - fba.py
8c90f77cac57ee37689222637d66e208c804fc71
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3
4 import reqto
5 import re
6 import sqlite3
7 import json
8 import sys
9 import time
10
11 with open("config.json") as f:
12     config = json.loads(f.read())
13
14 blacklist = [
15     "activitypub-troll.cf",
16     "gab.best",
17     "4chan.icu",
18     "social.shrimpcam.pw",
19     "mastotroll.netz.org",
20     "ngrok.io",
21 ]
22
23 headers = {
24     "user-agent": config["useragent"]
25 }
26
27 conn = sqlite3.connect("blocks.db")
28 c = conn.cursor()
29
30 def get_hash(domain: str) -> str:
31     # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
32     return sha256(domain.encode("utf-8")).hexdigest()
33
34 def get_peers(domain: str) -> str:
35     # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
36     peers = None
37
38     try:
39         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
40         peers = res.json()
41     except:
42         print("WARNING: Cannot fetch peers:", domain)
43
44     # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
45     return peers
46
47 def post_json_api(domain: str, path: str, data: str) -> list:
48     # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
49     res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=5)
50
51     if not res.ok:
52         print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
53         raise
54
55     doc = res.json()
56     # NOISY-DEBUG: print("DEBUG: Returning doc():", len(doc))
57     return doc
58
59 def fetch_nodeinfo(domain: str) -> list:
60     # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
61
62     requests = [
63        f"https://{domain}/nodeinfo/2.1.json",
64        f"https://{domain}/nodeinfo/2.0",
65        f"https://{domain}/nodeinfo/2.0.json",
66        f"https://{domain}/nodeinfo/2.1",
67        f"https://{domain}/api/v1/instance"
68     ]
69
70     json = None
71     for request in requests:
72         # NOISY-DEBUG: print("DEBUG: Fetching request:", request)
73
74         try:
75             res = reqto.get(request, headers=headers, timeout=5)
76
77             # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
78             if res.ok and res.json() is not None:
79                 # NOISY-DEBUG: print("DEBUG: Success:", request)
80                 json = res.json()
81
82         except:
83             # NOISY-DEBUG: print("DEBUG: Failed fetching nodeinfo from domain:", domain)
84             continue
85
86         if json is not None:
87             break
88
89     if json is None:
90         print("WARNING: Failed fetching nodeinfo from domain:", domain)
91
92     # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
93     return json
94
95 def determine_software(domain: str) -> str:
96     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
97     software = None
98     json = fetch_nodeinfo(domain)
99     # NOISY-DEBUG: print("DEBUG: json():", len(json))
100
101     if json["software"]["name"] in ["akkoma", "rebased"]:
102         # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, json["software"]["name"])
103         software = "pleroma"
104     elif json["software"]["name"] in ["hometown", "ecko"]:
105         # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, json["software"]["name"])
106         software = "mastodon"
107     elif json["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
108         # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, json["software"]["name"])
109         software = "misskey"
110     else:
111         # NOISY-DEBUG: print("DEBUG: Using name:", domain, json["software"]["name"])
112         software = json["software"]["name"]
113
114     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
115     return software
116
117 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
118     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
119     try:
120         c.execute(
121             "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
122             (
123                 reason,
124                 blocker,
125                 blocked,
126                 block_level
127             ),
128         )
129
130     except:
131         print("ERROR: failed SQL query:", reason, blocker, blocked, block_level)
132         sys.exit(255)
133
134 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
135     # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
136     try:
137         c.execute(
138             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
139             (
140                 last_seen,
141                 blocker,
142                 blocked,
143                 block_level
144             )
145         )
146
147     except:
148         print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level)
149         sys.exit(255)
150
151 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
152     # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level,first_added,last_seen:", blocker, blocked, reason, block_level, first_added, last_seen)
153     if blocker.find("@") > 0:
154         print("WARNING: Bad blocker:", blocker)
155         raise
156     elif blocked.find("@") > 0:
157         print("WARNING: Bad blocked:", blocked)
158         raise
159
160     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
161     try:
162         c.execute(
163             "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
164              (
165                  blocker,
166                  blocked,
167                  reason,
168                  block_level,
169                  first_added,
170                  last_seen
171              ),
172         )
173
174     except:
175         print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen)
176         sys.exit(255)
177
178 def add_instance(domain: str):
179     # NOISY-DEBUG: print("DEBUG: domain:", domain)
180     if domain.find("@") > 0:
181         print("WARNING: Bad domain name:", domain)
182         raise
183
184     print("--- Adding new instance:", domain)
185     try:
186         c.execute(
187             "INSERT INTO instances SELECT ?, ?, ?, ?",
188             (
189                domain,
190                get_hash(domain),
191                determine_software(domain),
192                time.time()
193             ),
194         )
195
196     except:
197         print("ERROR: failed SQL query:", domain)
198         sys.exit(255)
199
200 def send_bot_post(instance: str, blocks: dict):
201     message = instance + " has blocked the following instances:\n\n"
202     truncated = False
203
204     if len(blocks) > 20:
205         truncated = True
206         blocks = blocks[0 : 19]
207
208     for block in blocks:
209         if block["reason"] == None or block["reason"] == '':
210             message = message + block["blocked"] + " with unspecified reason\n"
211         else:
212             if len(block["reason"]) > 420:
213                 block["reason"] = block["reason"][0:419] + "[…]"
214
215             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
216
217     if truncated:
218         message = message + "(the list has been truncated to the first 20 entries)"
219
220     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
221
222     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
223         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
224         headers=botheaders, timeout=10).json()
225
226     return True
227
228 def get_mastodon_blocks(domain: str) -> dict:
229     # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
230     blocks = {
231         "Suspended servers": [],
232         "Filtered media": [],
233         "Limited servers": [],
234         "Silenced servers": [],
235     }
236
237     translations = {
238         "Silenced instances": "Silenced servers",
239         "Suspended instances": "Suspended servers",
240         "Gesperrte Server": "Suspended servers",
241         "Gefilterte Medien": "Filtered media",
242         "Stummgeschaltete Server": "Silenced servers",
243         "停止済みのサーバー": "Suspended servers",
244         "メディアを拒否しているサーバー": "Filtered media",
245         "サイレンス済みのサーバー": "Silenced servers",
246         "שרתים מושעים": "Suspended servers",
247         "מדיה מסוננת": "Filtered media",
248         "שרתים מוגבלים": "Silenced servers",
249         "Serveurs suspendus": "Suspended servers",
250         "Médias filtrés": "Filtered media",
251         "Serveurs limités": "Silenced servers",
252     }
253
254     try:
255         doc = BeautifulSoup(
256             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
257             "html.parser",
258         )
259     except:
260         print("ERROR: Cannot fetch from domain:", domain)
261         return {}
262
263     for header in doc.find_all("h3"):
264         header_text = header.text
265
266         if header_text in translations:
267             header_text = translations[header_text]
268
269         if header_text in blocks:
270             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
271             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
272                 blocks[header_text].append(
273                     {
274                         "domain": line.find("span").text,
275                         "hash": line.find("span")["title"][9:],
276                         "reason": line.find_all("td")[1].text.strip(),
277                     }
278                 )
279
280     # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
281     return {
282         "reject": blocks["Suspended servers"],
283         "media_removal": blocks["Filtered media"],
284         "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
285     }
286
287 def get_friendica_blocks(domain: str) -> dict:
288     # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
289     blocks = []
290
291     try:
292         doc = BeautifulSoup(
293             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
294             "html.parser",
295         )
296     except:
297         print("WARNING: Failed to fetch /friendica from domain:", domain)
298         return {}
299
300     blocklist = doc.find(id="about_blocklist")
301
302     # Prevents exceptions:
303     if blocklist is None:
304         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
305         return {}
306
307     for line in blocklist.find("table").find_all("tr")[1:]:
308         blocks.append({
309             "domain": line.find_all("td")[0].text.strip(),
310             "reason": line.find_all("td")[1].text.strip()
311         })
312
313     # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
314     return {
315         "reject": blocks
316     }
317
318 def get_misskey_blocks(domain: str) -> dict:
319     # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
320     blocks = {
321         "suspended": [],
322         "blocked": []
323     }
324
325     try:
326         counter = 0
327         step = 99
328         while True:
329             # iterating through all "suspended" (follow-only in its terminology)
330             # instances page-by-page, since that troonware doesn't support
331             # sending them all at once
332             try:
333                 if counter == 0:
334                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
335                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
336                         "sort": "+caughtAt",
337                         "host": None,
338                         "suspended": True,
339                         "limit": step
340                     }))
341                 else:
342                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
343                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
344                         "sort": "+caughtAt",
345                         "host": None,
346                         "suspended": True,
347                         "limit": step,
348                         "offset": counter-1
349                     }))
350
351                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
352                 if len(doc) == 0:
353                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
354                     break
355
356                 for instance in doc:
357                     # just in case
358                     if instance["isSuspended"]:
359                         blocks["suspended"].append(
360                             {
361                                 "domain": instance["host"],
362                                 # no reason field, nothing
363                                 "reason": ""
364                             }
365                         )
366
367                 if len(doc) < step:
368                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
369                     break
370
371                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
372                 counter = counter + step
373
374             except:
375                 print("WARNING: Caught error, exiting loop:", domain)
376                 counter = 0
377                 break
378
379         while True:
380             # same shit, different asshole ("blocked" aka full suspend)
381             try:
382                 if counter == 0:
383                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
384                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
385                         "sort": "+caughtAt",
386                         "host": None,
387                         "blocked": True,
388                         "limit": step
389                     }))
390                 else:
391                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
392                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
393                         "sort": "+caughtAt",
394                         "host": None,
395                         "blocked": True,
396                         "limit": step,
397                         "offset": counter-1
398                     }))
399
400                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
401                 if len(doc) == 0:
402                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
403                     break
404
405                 for instance in doc:
406                     if instance["isBlocked"]:
407                         blocks["blocked"].append({
408                             "domain": instance["host"],
409                             "reason": ""
410                         })
411
412                 if len(doc) < step:
413                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
414                     break
415
416                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
417                 counter = counter + step
418
419             except:
420                 counter = 0
421                 break
422
423         # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
424         return {
425             "reject": blocks["blocked"],
426             "followers_only": blocks["suspended"]
427         }
428
429     except:
430         print("WARNING: API request failed for domain:", domain)
431         return {}
432
433 def tidyup(domain: str) -> str:
434     # some retards put their blocks in variable case
435     domain = domain.lower()
436
437     # other retards put the port
438     domain = re.sub("\:\d+$", "", domain)
439
440     # bigger retards put the schema in their blocklist, sometimes even without slashes
441     domain = re.sub("^https?\:(\/*)", "", domain)
442
443     # and trailing slash
444     domain = re.sub("\/$", "", domain)
445
446     # and the @
447     domain = re.sub("^\@", "", domain)
448
449     # the biggest retards of them all try to block individual users
450     domain = re.sub("(.+)\@", "", domain)
451
452     return domain