]> git.mxchange.org Git - fba.git/blob - fba.py
Continued:
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3
4 import reqto
5 import re
6 import sqlite3
7 import json
8 import sys
9
10 with open("config.json") as f:
11     config = json.loads(f.read())
12
13 blacklist = [
14     "activitypub-troll.cf",
15     "gab.best",
16     "4chan.icu",
17     "social.shrimpcam.pw",
18     "mastotroll.netz.org",
19     "ngrok.io",
20 ]
21
22 headers = {
23     "user-agent": config["useragent"]
24 }
25
26 conn = sqlite3.connect("blocks.db")
27 c = conn.cursor()
28
29 def get_hash(domain: str) -> str:
30     # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
31     return sha256(domain.encode("utf-8")).hexdigest()
32
33 def get_peers(domain: str) -> str:
34     # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
35     peers = None
36
37     try:
38         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
39         peers = res.json()
40     except:
41         print("WARNING: Cannot fetch peers:", domain)
42
43     # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
44     return peers
45
46 def post_json_api(domain: str, path: str, data: str) -> list:
47     # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
48     res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=5)
49
50     if not res.ok:
51         print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
52         raise
53
54     doc = res.json()
55     # NOISY-DEBUG: print("DEBUG: Returning doc():", len(doc))
56     return doc
57
58 def fetch_nodeinfo(domain: str) -> list:
59     print("DEBUG: Fetching nodeinfo from domain:", domain)
60     json = None
61     try:
62         print("DEBUG: Fetching 2.1.json from domain:", domain)
63         res = reqto.get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
64
65         if res.status_code == 404 or "text/html" in res.headers["content-type"]:
66             print("DEBUG: Fetching 2.0 from domain:", domain)
67             res = reqto.get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
68
69         if res.status_code == 404 or "text/html" in res.headers["content-type"]:
70             print("DEBUG: Fetching 2.0.json from domain:", domain)
71             res = reqto.get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
72
73         if res.ok and "text/html" in res.headers["content-type"]:
74             print("DEBUG: Fetching 2.1 from domain:", domain)
75             res = reqto.get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
76
77         if res.status_code == 404 or "text/html" in res.headers["content-type"]:
78             print("DEBUG: Fetching /api/v1/instance from domain:", domain)
79             res = reqto.get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
80
81         print("DEBUG: domain,res.ok,res.status_code:", domain, res.ok, res.status_code)
82         if res.ok:
83             json = res.json()
84
85     except:
86         print("WARNING: Failed fetching nodeinfo from domain:", domain)
87
88     print("DEBUG: Returning json():", len(json))
89     return json
90
91 def determine_software(domain: str) -> str:
92     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
93     software = None
94     json = fetch_nodeinfo(domain)
95     # NOISY-DEBUG: print("DEBUG: json():", len(json))
96
97     if json["software"]["name"] in ["akkoma", "rebased"]:
98         software = "pleroma"
99     elif json["software"]["name"] in ["hometown", "ecko"]:
100         software = "mastodon"
101     elif json["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
102         software = "misskey"
103     else:
104         software = json["software"]["name"]
105
106     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
107     return software
108
109 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
110     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
111     try:
112         c.execute(
113             "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
114             (
115                 reason,
116                 blocker,
117                 blocked,
118                 block_level
119             ),
120         )
121
122     except:
123         print("ERROR: failed SQL query")
124         sys.exit(255)
125
126 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
127     # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
128     try:
129         c.execute(
130             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
131             (
132                 last_seen,
133                 blocker,
134                 blocked,
135                 block_level
136             )
137         )
138
139     except:
140         print("ERROR: failed SQL query")
141         sys.exit(255)
142
143 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
144     if blocker.find("@") > 0:
145         print("WARNING: Bad blocker:", blocker)
146         raise
147     elif blocked.find("@") > 0:
148         print("WARNING: Bad blocked:", blocked)
149         raise
150
151     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
152     try:
153         c.execute(
154             "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
155              (
156                  blocker,
157                  blocked,
158                  reason,
159                  block_level,
160                  first_added,
161                  last_seen
162              ),
163         )
164
165     except:
166         print("ERROR: failed SQL query")
167         sys.exit(255)
168
169 def add_instance(domain: str):
170     if domain.find("@") > 0:
171         print("WARNING: Bad domain name:", domain)
172         raise
173
174     print("--- Adding new instance:", domain)
175     try:
176         c.execute(
177             "INSERT INTO instances SELECT ?, ?, ?",
178             (
179                domain,
180                get_hash(domain),
181                determine_software(domain)
182             ),
183         )
184
185     except:
186         print("ERROR: failed SQL query")
187         sys.exit(255)
188
189 def send_bot_post(instance: str, blocks: dict):
190     message = instance + " has blocked the following instances:\n\n"
191     truncated = False
192
193     if len(blocks) > 20:
194         truncated = True
195         blocks = blocks[0 : 19]
196
197     for block in blocks:
198         if block["reason"] == None or block["reason"] == '':
199             message = message + block["blocked"] + " with unspecified reason\n"
200         else:
201             if len(block["reason"]) > 420:
202                 block["reason"] = block["reason"][0:419] + "[…]"
203
204             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
205
206     if truncated:
207         message = message + "(the list has been truncated to the first 20 entries)"
208
209     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
210
211     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
212         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
213         headers=botheaders, timeout=10).json()
214
215     return True
216
217 def get_mastodon_blocks(domain: str) -> dict:
218     # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
219     blocks = {
220         "Suspended servers": [],
221         "Filtered media": [],
222         "Limited servers": [],
223         "Silenced servers": [],
224     }
225
226     translations = {
227         "Silenced instances": "Silenced servers",
228         "Suspended instances": "Suspended servers",
229         "Gesperrte Server": "Suspended servers",
230         "Gefilterte Medien": "Filtered media",
231         "Stummgeschaltete Server": "Silenced servers",
232         "停止済みのサーバー": "Suspended servers",
233         "メディアを拒否しているサーバー": "Filtered media",
234         "サイレンス済みのサーバー": "Silenced servers",
235         "שרתים מושעים": "Suspended servers",
236         "מדיה מסוננת": "Filtered media",
237         "שרתים מוגבלים": "Silenced servers",
238         "Serveurs suspendus": "Suspended servers",
239         "Médias filtrés": "Filtered media",
240         "Serveurs limités": "Silenced servers",
241     }
242
243     try:
244         doc = BeautifulSoup(
245             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
246             "html.parser",
247         )
248     except:
249         print("ERROR: Cannot fetch from domain:", domain)
250         return {}
251
252     for header in doc.find_all("h3"):
253         header_text = header.text
254
255         if header_text in translations:
256             header_text = translations[header_text]
257
258         if header_text in blocks:
259             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
260             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
261                 blocks[header_text].append(
262                     {
263                         "domain": line.find("span").text,
264                         "hash": line.find("span")["title"][9:],
265                         "reason": line.find_all("td")[1].text.strip(),
266                     }
267                 )
268
269     # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
270     return {
271         "reject": blocks["Suspended servers"],
272         "media_removal": blocks["Filtered media"],
273         "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
274     }
275
276 def get_friendica_blocks(domain: str) -> dict:
277     # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
278     blocks = []
279
280     try:
281         doc = BeautifulSoup(
282             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
283             "html.parser",
284         )
285     except:
286         print("WARNING: Failed to fetch /friendica from domain:", domain)
287         return {}
288
289     blocklist = doc.find(id="about_blocklist")
290
291     # Prevents exceptions:
292     if blocklist is None:
293         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
294         return {}
295
296     for line in blocklist.find("table").find_all("tr")[1:]:
297         blocks.append({
298             "domain": line.find_all("td")[0].text.strip(),
299             "reason": line.find_all("td")[1].text.strip()
300         })
301
302     # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
303     return {
304         "reject": blocks
305     }
306
307 def get_misskey_blocks(domain: str) -> dict:
308     # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
309     blocks = {
310         "suspended": [],
311         "blocked": []
312     }
313
314     try:
315         counter = 0
316         step = 99
317         while True:
318             # iterating through all "suspended" (follow-only in its terminology)
319             # instances page-by-page, since that troonware doesn't support
320             # sending them all at once
321             try:
322                 if counter == 0:
323                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
324                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
325                         "sort": "+caughtAt",
326                         "host": None,
327                         "suspended": True,
328                         "limit": step
329                     }))
330                 else:
331                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
332                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
333                         "sort": "+caughtAt",
334                         "host": None,
335                         "suspended": True,
336                         "limit": step,
337                         "offset": counter-1
338                     }))
339
340                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
341                 if len(doc) == 0:
342                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
343                     break
344
345                 for instance in doc:
346                     # just in case
347                     if instance["isSuspended"]:
348                         blocks["suspended"].append(
349                             {
350                                 "domain": instance["host"],
351                                 # no reason field, nothing
352                                 "reason": ""
353                             }
354                         )
355
356                 if len(doc) < step:
357                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
358                     break
359
360                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
361                 counter = counter + step
362
363             except:
364                 print("WARNING: Caught error, exiting loop:", domain)
365                 counter = 0
366                 break
367
368         while True:
369             # same shit, different asshole ("blocked" aka full suspend)
370             try:
371                 if counter == 0:
372                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
373                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
374                         "sort": "+caughtAt",
375                         "host": None,
376                         "blocked": True,
377                         "limit": step
378                     }))
379                 else:
380                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
381                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
382                         "sort": "+caughtAt",
383                         "host": None,
384                         "blocked": True,
385                         "limit": step,
386                         "offset": counter-1
387                     }))
388
389                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
390                 if len(doc) == 0:
391                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
392                     break
393
394                 for instance in doc:
395                     if instance["isBlocked"]:
396                         blocks["blocked"].append({
397                             "domain": instance["host"],
398                             "reason": ""
399                         })
400
401                 if len(doc) < step:
402                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
403                     break
404
405                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
406                 counter = counter + step
407
408             except:
409                 counter = 0
410                 break
411
412         # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
413         return {
414             "reject": blocks["blocked"],
415             "followers_only": blocks["suspended"]
416         }
417
418     except:
419         print("WARNING: API request failed for domain:", domain)
420         return {}
421
422 def tidyup(domain: str) -> str:
423     # some retards put their blocks in variable case
424     domain = domain.lower()
425
426     # other retards put the port
427     domain = re.sub("\:\d+$", "", domain)
428
429     # bigger retards put the schema in their blocklist, sometimes even without slashes
430     domain = re.sub("^https?\:(\/*)", "", domain)
431
432     # and trailing slash
433     domain = re.sub("\/$", "", domain)
434
435     # and the @
436     domain = re.sub("^\@", "", domain)
437
438     # the biggest retards of them all try to block individual users
439     domain = re.sub("(.+)\@", "", domain)
440
441     return domain