]> git.mxchange.org Git - fba.git/blob - fba.py
c78dd6450cb670b919ff25c328461f810048083a
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3
4 import reqto
5 import re
6 import sqlite3
7 import json
8 import sys
9 import time
10
11 with open("config.json") as f:
12     config = json.loads(f.read())
13
14 blacklist = [
15     "activitypub-troll.cf",
16     "gab.best",
17     "4chan.icu",
18     "social.shrimpcam.pw",
19     "mastotroll.netz.org",
20     "ngrok.io",
21 ]
22
23 headers = {
24     "user-agent": config["useragent"]
25 }
26
27 conn = sqlite3.connect("blocks.db")
28 c = conn.cursor()
29
30 def get_hash(domain: str) -> str:
31     # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
32     return sha256(domain.encode("utf-8")).hexdigest()
33
34 def update_last_access(domain: str):
35     # NOISY-DEBUG: print("DEBUG: Updating last_access for domain:", domain)
36
37     try:
38         c.execute("UPDATE instances SET last_access = ? WHERE domain = ?", [
39             time.time(),
40             domain
41         ])
42
43     except:
44         print("ERROR: failed SQL query:", domain)
45         sys.exit(255)
46
47 def get_peers(domain: str) -> str:
48     # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
49     peers = None
50
51     try:
52         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
53         peers = res.json()
54     except:
55         print("WARNING: Cannot fetch peers:", domain)
56
57     if peers is not None:
58         update_last_access(domain)
59
60     # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
61     return peers
62
63 def post_json_api(domain: str, path: str, data: str) -> list:
64     # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
65     res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=5)
66
67     if not res.ok:
68         print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
69         raise
70     else:
71         update_last_access(domain)
72
73     doc = res.json()
74     # NOISY-DEBUG: print("DEBUG: Returning doc():", len(doc))
75     return doc
76
77 def fetch_nodeinfo(domain: str) -> list:
78     # NOISY-DEBUG: print("DEBUG: Fetching nodeinfo from domain:", domain)
79
80     requests = [
81        f"https://{domain}/nodeinfo/2.1.json",
82        f"https://{domain}/nodeinfo/2.0",
83        f"https://{domain}/nodeinfo/2.0.json",
84        f"https://{domain}/nodeinfo/2.1",
85        f"https://{domain}/api/v1/instance"
86     ]
87
88     json = None
89     for request in requests:
90         # NOISY-DEBUG: print("DEBUG: Fetching request:", request)
91
92         try:
93             res = reqto.get(request, headers=headers, timeout=5)
94
95             # NOISY-DEBUG: print("DEBUG: res.ok,res.json[]:", res.ok, type(res.json()))
96             if res.ok and res.json() is not None:
97                 # NOISY-DEBUG: print("DEBUG: Success:", request)
98                 json = res.json()
99
100         except:
101             # NOISY-DEBUG: print("DEBUG: Failed fetching nodeinfo from domain:", domain)
102             continue
103
104         if json is not None:
105             break
106
107     if json is None:
108         print("WARNING: Failed fetching nodeinfo from domain:", domain)
109     else:
110         update_last_access(domain)
111
112     # NOISY-DEBUG: print("DEBUG: Returning json():", len(json))
113     return json
114
115 def determine_software(domain: str) -> str:
116     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
117     software = None
118     json = fetch_nodeinfo(domain)
119     # NOISY-DEBUG: print("DEBUG: json():", len(json))
120
121     if json["software"]["name"] in ["akkoma", "rebased"]:
122         # NOISY-DEBUG: print("DEBUG: Setting pleroma:", domain, json["software"]["name"])
123         software = "pleroma"
124     elif json["software"]["name"] in ["hometown", "ecko"]:
125         # NOISY-DEBUG: print("DEBUG: Setting mastodon:", domain, json["software"]["name"])
126         software = "mastodon"
127     elif json["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
128         # NOISY-DEBUG: print("DEBUG: Setting misskey:", domain, json["software"]["name"])
129         software = "misskey"
130     else:
131         # NOISY-DEBUG: print("DEBUG: Using name:", domain, json["software"]["name"])
132         software = json["software"]["name"]
133
134     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
135     return software
136
137 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
138     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
139     try:
140         c.execute(
141             "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
142             (
143                 reason,
144                 blocker,
145                 blocked,
146                 block_level
147             ),
148         )
149
150     except:
151         print("ERROR: failed SQL query:", reason, blocker, blocked, block_level)
152         sys.exit(255)
153
154 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
155     # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
156     try:
157         c.execute(
158             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
159             (
160                 last_seen,
161                 blocker,
162                 blocked,
163                 block_level
164             )
165         )
166
167     except:
168         print("ERROR: failed SQL query:", last_seen, blocker, blocked, block_level)
169         sys.exit(255)
170
171 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
172     # NOISY-DEBUG: print("DEBUG: blocker,blocked,reason,block_level,first_added,last_seen:", blocker, blocked, reason, block_level, first_added, last_seen)
173     if blocker.find("@") > 0:
174         print("WARNING: Bad blocker:", blocker)
175         raise
176     elif blocked.find("@") > 0:
177         print("WARNING: Bad blocked:", blocked)
178         raise
179
180     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
181     try:
182         c.execute(
183             "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
184              (
185                  blocker,
186                  blocked,
187                  reason,
188                  block_level,
189                  first_added,
190                  last_seen
191              ),
192         )
193
194     except:
195         print("ERROR: failed SQL query:", blocker, blocked, reason, block_level, first_added, last_seen)
196         sys.exit(255)
197
198 def add_instance(domain: str):
199     # NOISY-DEBUG: print("DEBUG: domain:", domain)
200     if domain.find("@") > 0:
201         print("WARNING: Bad domain name:", domain)
202         raise
203
204     print("--- Adding new instance:", domain)
205     try:
206         c.execute(
207             "INSERT INTO instances SELECT ?, ?, ?, ?",
208             (
209                domain,
210                get_hash(domain),
211                determine_software(domain),
212                time.time()
213             ),
214         )
215
216     except:
217         print("ERROR: failed SQL query:", domain)
218         sys.exit(255)
219
220 def send_bot_post(instance: str, blocks: dict):
221     message = instance + " has blocked the following instances:\n\n"
222     truncated = False
223
224     if len(blocks) > 20:
225         truncated = True
226         blocks = blocks[0 : 19]
227
228     for block in blocks:
229         if block["reason"] == None or block["reason"] == '':
230             message = message + block["blocked"] + " with unspecified reason\n"
231         else:
232             if len(block["reason"]) > 420:
233                 block["reason"] = block["reason"][0:419] + "[…]"
234
235             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
236
237     if truncated:
238         message = message + "(the list has been truncated to the first 20 entries)"
239
240     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
241
242     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
243         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
244         headers=botheaders, timeout=10).json()
245
246     return True
247
248 def get_mastodon_blocks(domain: str) -> dict:
249     # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
250     blocks = {
251         "Suspended servers": [],
252         "Filtered media": [],
253         "Limited servers": [],
254         "Silenced servers": [],
255     }
256
257     translations = {
258         "Silenced instances": "Silenced servers",
259         "Suspended instances": "Suspended servers",
260         "Gesperrte Server": "Suspended servers",
261         "Gefilterte Medien": "Filtered media",
262         "Stummgeschaltete Server": "Silenced servers",
263         "停止済みのサーバー": "Suspended servers",
264         "メディアを拒否しているサーバー": "Filtered media",
265         "サイレンス済みのサーバー": "Silenced servers",
266         "שרתים מושעים": "Suspended servers",
267         "מדיה מסוננת": "Filtered media",
268         "שרתים מוגבלים": "Silenced servers",
269         "Serveurs suspendus": "Suspended servers",
270         "Médias filtrés": "Filtered media",
271         "Serveurs limités": "Silenced servers",
272     }
273
274     try:
275         doc = BeautifulSoup(
276             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
277             "html.parser",
278         )
279     except:
280         print("ERROR: Cannot fetch from domain:", domain)
281         return {}
282
283     for header in doc.find_all("h3"):
284         header_text = header.text
285
286         if header_text in translations:
287             header_text = translations[header_text]
288
289         if header_text in blocks:
290             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
291             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
292                 blocks[header_text].append(
293                     {
294                         "domain": line.find("span").text,
295                         "hash": line.find("span")["title"][9:],
296                         "reason": line.find_all("td")[1].text.strip(),
297                     }
298                 )
299
300     # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
301     return {
302         "reject": blocks["Suspended servers"],
303         "media_removal": blocks["Filtered media"],
304         "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
305     }
306
307 def get_friendica_blocks(domain: str) -> dict:
308     # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
309     blocks = []
310
311     try:
312         doc = BeautifulSoup(
313             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
314             "html.parser",
315         )
316     except:
317         print("WARNING: Failed to fetch /friendica from domain:", domain)
318         return {}
319
320     blocklist = doc.find(id="about_blocklist")
321
322     # Prevents exceptions:
323     if blocklist is None:
324         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
325         return {}
326
327     for line in blocklist.find("table").find_all("tr")[1:]:
328         blocks.append({
329             "domain": line.find_all("td")[0].text.strip(),
330             "reason": line.find_all("td")[1].text.strip()
331         })
332
333     # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
334     return {
335         "reject": blocks
336     }
337
338 def get_misskey_blocks(domain: str) -> dict:
339     # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
340     blocks = {
341         "suspended": [],
342         "blocked": []
343     }
344
345     try:
346         counter = 0
347         step = 99
348         while True:
349             # iterating through all "suspended" (follow-only in its terminology)
350             # instances page-by-page, since that troonware doesn't support
351             # sending them all at once
352             try:
353                 if counter == 0:
354                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
355                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
356                         "sort": "+caughtAt",
357                         "host": None,
358                         "suspended": True,
359                         "limit": step
360                     }))
361                 else:
362                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
363                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
364                         "sort": "+caughtAt",
365                         "host": None,
366                         "suspended": True,
367                         "limit": step,
368                         "offset": counter-1
369                     }))
370
371                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
372                 if len(doc) == 0:
373                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
374                     break
375
376                 for instance in doc:
377                     # just in case
378                     if instance["isSuspended"]:
379                         blocks["suspended"].append(
380                             {
381                                 "domain": instance["host"],
382                                 # no reason field, nothing
383                                 "reason": ""
384                             }
385                         )
386
387                 if len(doc) < step:
388                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
389                     break
390
391                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
392                 counter = counter + step
393
394             except:
395                 print("WARNING: Caught error, exiting loop:", domain)
396                 counter = 0
397                 break
398
399         while True:
400             # same shit, different asshole ("blocked" aka full suspend)
401             try:
402                 if counter == 0:
403                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
404                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
405                         "sort": "+caughtAt",
406                         "host": None,
407                         "blocked": True,
408                         "limit": step
409                     }))
410                 else:
411                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
412                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
413                         "sort": "+caughtAt",
414                         "host": None,
415                         "blocked": True,
416                         "limit": step,
417                         "offset": counter-1
418                     }))
419
420                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
421                 if len(doc) == 0:
422                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
423                     break
424
425                 for instance in doc:
426                     if instance["isBlocked"]:
427                         blocks["blocked"].append({
428                             "domain": instance["host"],
429                             "reason": ""
430                         })
431
432                 if len(doc) < step:
433                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
434                     break
435
436                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
437                 counter = counter + step
438
439             except:
440                 counter = 0
441                 break
442
443         # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
444         return {
445             "reject": blocks["blocked"],
446             "followers_only": blocks["suspended"]
447         }
448
449     except:
450         print("WARNING: API request failed for domain:", domain)
451         return {}
452
453 def tidyup(domain: str) -> str:
454     # some retards put their blocks in variable case
455     domain = domain.lower()
456
457     # other retards put the port
458     domain = re.sub("\:\d+$", "", domain)
459
460     # bigger retards put the schema in their blocklist, sometimes even without slashes
461     domain = re.sub("^https?\:(\/*)", "", domain)
462
463     # and trailing slash
464     domain = re.sub("\/$", "", domain)
465
466     # and the @
467     domain = re.sub("^\@", "", domain)
468
469     # the biggest retards of them all try to block individual users
470     domain = re.sub("(.+)\@", "", domain)
471
472     return domain