]> git.mxchange.org Git - fba.git/blob - fba.py
aa0db400685aa595974316739fad9282baed643a
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3
4 import reqto
5 import re
6 import sqlite3
7 import json
8 import sys
9
10 with open("config.json") as f:
11     config = json.loads(f.read())
12
13 blacklist = [
14     "activitypub-troll.cf",
15     "gab.best",
16     "4chan.icu",
17     "social.shrimpcam.pw",
18     "mastotroll.netz.org",
19     "ngrok.io",
20 ]
21
22 headers = {
23     "user-agent": config["useragent"]
24 }
25
26 conn = sqlite3.connect("blocks.db")
27 c = conn.cursor()
28
29 def get_hash(domain: str) -> str:
30     # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
31     return sha256(domain.encode("utf-8")).hexdigest()
32
33 def get_peers(domain: str) -> str:
34     # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
35     peers = None
36
37     try:
38         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
39         peers = res.json()
40     except:
41         print("WARNING: Cannot fetch peers:", domain)
42
43     # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
44     return peers
45
46 def post_json_api(domain: str, path: str, data: str) -> list:
47     # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
48     res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=5)
49
50     if not res.ok:
51         print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
52         raise
53
54     doc = res.json()
55     # NOISY-DEBUG: print("DEBUG: Returning doc():", len(doc))
56     return doc
57
58 def determine_software(domain: str) -> str:
59     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
60     software = None
61     try:
62         res = reqto.get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
63
64         # NOISY-DEBUG: print("DEBUG: domain,res.ok,res.status_code:", domain, res.ok, res.status_code)
65         if res.status_code == 404:
66             res = reqto.get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
67
68         if res.status_code == 404:
69             res = reqto.get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
70
71         if res.ok and "text/html" in res.headers["content-type"]:
72             res = reqto.get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
73
74         if res.ok:
75             json = res.json()
76             # NOISY-DEBUG: print("DEBUG: json():", len(json))
77
78             if json["software"]["name"] in ["akkoma", "rebased"]:
79                 software = "pleroma"
80             elif json["software"]["name"] in ["hometown", "ecko"]:
81                 software = "mastodon"
82             elif json["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
83                 software = "misskey"
84             else:
85                 software = json["software"]["name"]
86         elif res.status_code == 404:
87             res = reqto.get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
88         if res.ok:
89             software = "mastodon"
90     except:
91         print("WARNING: Failed fetching instance meta data")
92
93     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
94     return software
95
96 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
97     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
98     try:
99         c.execute(
100             "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
101             (
102                 reason,
103                 blocker,
104                 blocked,
105                 block_level
106             ),
107         )
108
109     except:
110         print("ERROR: failed SQL query")
111         sys.exit(255)
112
113 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
114     # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
115     try:
116         c.execute(
117             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
118             (
119                 last_seen,
120                 blocker,
121                 blocked,
122                 block_level
123             )
124         )
125
126     except:
127         print("ERROR: failed SQL query")
128         sys.exit(255)
129
130 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
131     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
132     try:
133         c.execute(
134             "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
135              (
136                  blocker,
137                  blocked,
138                  reason,
139                  block_level,
140                  first_added,
141                  last_seen
142              ),
143         )
144
145     except:
146         print("ERROR: failed SQL query")
147         sys.exit(255)
148
149 def add_instance(domain: str):
150     print("--- Adding new instance:", domain)
151     try:
152         c.execute(
153             "INSERT INTO instances SELECT ?, ?, ?",
154             (
155                domain,
156                get_hash(domain),
157                determine_software(domain)
158             ),
159         )
160
161     except:
162         print("ERROR: failed SQL query")
163         sys.exit(255)
164
165 def send_bot_post(instance: str, blocks: dict):
166     message = instance + " has blocked the following instances:\n\n"
167     truncated = False
168
169     if len(blocks) > 20:
170         truncated = True
171         blocks = blocks[0 : 19]
172
173     for block in blocks:
174         if block["reason"] == None or block["reason"] == '':
175             message = message + block["blocked"] + " with unspecified reason\n"
176         else:
177             if len(block["reason"]) > 420:
178                 block["reason"] = block["reason"][0:419] + "[…]"
179
180             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
181
182     if truncated:
183         message = message + "(the list has been truncated to the first 20 entries)"
184
185     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
186
187     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
188         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
189         headers=botheaders, timeout=10).json()
190
191     return True
192
193 def get_mastodon_blocks(domain: str) -> dict:
194     # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
195     blocks = {
196         "Suspended servers": [],
197         "Filtered media": [],
198         "Limited servers": [],
199         "Silenced servers": [],
200     }
201
202     translations = {
203         "Silenced instances": "Silenced servers",
204         "Suspended instances": "Suspended servers",
205         "Gesperrte Server": "Suspended servers",
206         "Gefilterte Medien": "Filtered media",
207         "Stummgeschaltete Server": "Silenced servers",
208         "停止済みのサーバー": "Suspended servers",
209         "メディアを拒否しているサーバー": "Filtered media",
210         "サイレンス済みのサーバー": "Silenced servers",
211         "שרתים מושעים": "Suspended servers",
212         "מדיה מסוננת": "Filtered media",
213         "שרתים מוגבלים": "Silenced servers",
214         "Serveurs suspendus": "Suspended servers",
215         "Médias filtrés": "Filtered media",
216         "Serveurs limités": "Silenced servers",
217     }
218
219     try:
220         doc = BeautifulSoup(
221             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
222             "html.parser",
223         )
224     except:
225         print("ERROR: Cannot fetch from domain:", domain)
226         return {}
227
228     for header in doc.find_all("h3"):
229         header_text = header.text
230
231         if header_text in translations:
232             header_text = translations[header_text]
233
234         if header_text in blocks:
235             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
236             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
237                 blocks[header_text].append(
238                     {
239                         "domain": line.find("span").text,
240                         "hash": line.find("span")["title"][9:],
241                         "reason": line.find_all("td")[1].text.strip(),
242                     }
243                 )
244
245     # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
246     return {
247         "reject": blocks["Suspended servers"],
248         "media_removal": blocks["Filtered media"],
249         "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
250     }
251
252 def get_friendica_blocks(domain: str) -> dict:
253     # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
254     blocks = []
255
256     try:
257         doc = BeautifulSoup(
258             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
259             "html.parser",
260         )
261     except:
262         print("WARNING: Failed to fetch /friendica from domain:", domain)
263         return {}
264
265     blocklist = doc.find(id="about_blocklist")
266
267     # Prevents exceptions:
268     if blocklist is None:
269         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
270         return {}
271
272     for line in blocklist.find("table").find_all("tr")[1:]:
273         blocks.append({
274             "domain": line.find_all("td")[0].text.strip(),
275             "reason": line.find_all("td")[1].text.strip()
276         })
277
278     # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
279     return {
280         "reject": blocks
281     }
282
283 def get_misskey_blocks(domain: str) -> dict:
284     # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
285     blocks = {
286         "suspended": [],
287         "blocked": []
288     }
289
290     try:
291         counter = 0
292         step = 99
293         while True:
294             # iterating through all "suspended" (follow-only in its terminology)
295             # instances page-by-page, since that troonware doesn't support
296             # sending them all at once
297             try:
298                 if counter == 0:
299                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
300                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
301                         "sort": "+caughtAt",
302                         "host": None,
303                         "suspended": True,
304                         "limit": step
305                     }))
306                 else:
307                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
308                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
309                         "sort": "+caughtAt",
310                         "host": None,
311                         "suspended": True,
312                         "limit": step,
313                         "offset": counter-1
314                     }))
315
316                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
317                 if len(doc) == 0:
318                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
319                     break
320
321                 for instance in doc:
322                     # just in case
323                     if instance["isSuspended"]:
324                         blocks["suspended"].append(
325                             {
326                                 "domain": instance["host"],
327                                 # no reason field, nothing
328                                 "reason": ""
329                             }
330                         )
331
332                 if len(doc) < step:
333                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
334                     break
335
336                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
337                 counter = counter + step
338
339             except:
340                 print("WARNING: Caught error, exiting loop:", domain)
341                 counter = 0
342                 break
343
344         while True:
345             # same shit, different asshole ("blocked" aka full suspend)
346             try:
347                 if counter == 0:
348                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
349                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
350                         "sort": "+caughtAt",
351                         "host": None,
352                         "blocked": True,
353                         "limit": step
354                     }))
355                 else:
356                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
357                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
358                         "sort": "+caughtAt",
359                         "host": None,
360                         "blocked": True,
361                         "limit": step,
362                         "offset": counter-1
363                     }))
364
365                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
366                 if len(doc) == 0:
367                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
368                     break
369
370                 for instance in doc:
371                     if instance["isBlocked"]:
372                         blocks["blocked"].append({
373                             "domain": instance["host"],
374                             "reason": ""
375                         })
376
377                 if len(doc) < step:
378                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
379                     break
380
381                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
382                 counter = counter + step
383
384             except:
385                 counter = 0
386                 break
387
388         # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
389         return {
390             "reject": blocks["blocked"],
391             "followers_only": blocks["suspended"]
392         }
393
394     except:
395         print("WARNING: API request failed for domain:", domain)
396         return {}
397
398 def tidyup(domain: str) -> str:
399     # some retards put their blocks in variable case
400     domain = domain.lower()
401
402     # other retards put the port
403     domain = re.sub("\:\d+$", "", domain)
404
405     # bigger retards put the schema in their blocklist, sometimes even without slashes
406     domain = re.sub("^https?\:(\/*)", "", domain)
407
408     # and trailing slash
409     domain = re.sub("\/$", "", domain)
410
411     # and the @
412     domain = re.sub("^\@", "", domain)
413
414     # the biggest retards of them all try to block individual users
415     domain = re.sub("(.+)\@", "", domain)
416
417     return domain