]> git.mxchange.org Git - fba.git/blob - fba.py
9eefe74dac19bdfc63701f07c115124188e98e7b
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3
4 import reqto
5 import re
6 import sqlite3
7 import json
8 import sys
9
10 with open("config.json") as f:
11     config = json.loads(f.read())
12
13 blacklist = [
14     "activitypub-troll.cf",
15     "gab.best",
16     "4chan.icu",
17     "social.shrimpcam.pw",
18     "mastotroll.netz.org"
19 ]
20
21 headers = {
22     "user-agent": config["useragent"]
23 }
24
25 conn = sqlite3.connect("blocks.db")
26 c = conn.cursor()
27
28 def get_hash(domain: str) -> str:
29     # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
30     return sha256(domain.encode("utf-8")).hexdigest()
31
32 def get_peers(domain: str) -> str:
33     # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
34     peers = None
35
36     try:
37         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
38         peers = res.json()
39     except:
40         print("WARNING: Cannot fetch peers:", domain, res.status_code)
41
42     # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
43     return peers
44
45 def post_json_api(domain: str, path: str, data: str) -> list:
46     # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data:", domain, path, data)
47     res = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=5)
48
49     if not res.ok:
50         print("WARNING: Cannot query JSON API:", domain, path, data, res.status_code)
51         raise
52
53     doc = res.json()
54     # NOISY-DEBUG: print("DEBUG: Returning doc():", len(doc))
55     return doc
56
57 def determine_software(domain: str) -> str:
58     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
59     software = None
60     try:
61         res = reqto.get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
62
63         # NOISY-DEBUG: print("DEBUG: domain,res.ok,res.status_code:", domain, res.ok, res.status_code)
64         if res.status_code == 404:
65             res = reqto.get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
66
67         if res.status_code == 404:
68             res = reqto.get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
69
70         if res.ok and "text/html" in res.headers["content-type"]:
71             res = reqto.get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
72
73         if res.ok:
74             json = res.json()
75             # NOISY-DEBUG: print("DEBUG: json():", len(json))
76
77             if json["software"]["name"] in ["akkoma", "rebased"]:
78                 software = "pleroma"
79             elif json["software"]["name"] in ["hometown", "ecko"]:
80                 software = "mastodon"
81             elif json["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
82                 software = "misskey"
83             else:
84                 software = json["software"]["name"]
85         elif res.status_code == 404:
86             res = reqto.get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
87         if res.ok:
88             software = "mastodon"
89     except:
90         print("WARNING: Failed fetching instance meta data")
91
92     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
93     return software
94
95 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
96     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
97     try:
98         c.execute(
99             "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
100             (
101                 reason,
102                 blocker,
103                 blocked,
104                 block_level
105             ),
106         )
107
108     except:
109         print("ERROR: failed SQL query")
110         sys.exit(255)
111
112 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
113     # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
114     try:
115         c.execute(
116             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
117             (
118                 last_seen,
119                 blocker,
120                 blocked,
121                 block_level
122             )
123         )
124
125     except:
126         print("ERROR: failed SQL query")
127         sys.exit(255)
128
129 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
130     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
131     try:
132         c.execute(
133             "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
134              (
135                  blocker,
136                  blocked,
137                  reason,
138                  block_level,
139                  first_added,
140                  last_seen
141              ),
142         )
143
144     except:
145         print("ERROR: failed SQL query")
146         sys.exit(255)
147
148 def add_instance(domain: str):
149     print("--- Adding new instance:", domain)
150     try:
151         c.execute(
152             "INSERT INTO instances SELECT ?, ?, ?",
153             (
154                domain,
155                get_hash(domain),
156                determine_software(domain)
157             ),
158         )
159
160     except:
161         print("ERROR: failed SQL query")
162         sys.exit(255)
163
164 def send_bot_post(instance: str, blocks: dict):
165     message = instance + " has blocked the following instances:\n\n"
166     truncated = False
167
168     if len(blocks) > 20:
169         truncated = True
170         blocks = blocks[0 : 19]
171
172     for block in blocks:
173         if block["reason"] == None or block["reason"] == '':
174             message = message + block["blocked"] + " with unspecified reason\n"
175         else:
176             if len(block["reason"]) > 420:
177                 block["reason"] = block["reason"][0:419] + "[…]"
178
179             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
180
181     if truncated:
182         message = message + "(the list has been truncated to the first 20 entries)"
183
184     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
185
186     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
187         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
188         headers=botheaders, timeout=10).json()
189
190     return True
191
192 def get_mastodon_blocks(domain: str) -> dict:
193     # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
194     blocks = {
195         "Suspended servers": [],
196         "Filtered media": [],
197         "Limited servers": [],
198         "Silenced servers": [],
199     }
200
201     translations = {
202         "Silenced instances": "Silenced servers",
203         "Suspended instances": "Suspended servers",
204         "Gesperrte Server": "Suspended servers",
205         "Gefilterte Medien": "Filtered media",
206         "Stummgeschaltete Server": "Silenced servers",
207         "停止済みのサーバー": "Suspended servers",
208         "メディアを拒否しているサーバー": "Filtered media",
209         "サイレンス済みのサーバー": "Silenced servers",
210         "שרתים מושעים": "Suspended servers",
211         "מדיה מסוננת": "Filtered media",
212         "שרתים מוגבלים": "Silenced servers",
213         "Serveurs suspendus": "Suspended servers",
214         "Médias filtrés": "Filtered media",
215         "Serveurs limités": "Silenced servers",
216     }
217
218     try:
219         doc = BeautifulSoup(
220             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
221             "html.parser",
222         )
223     except:
224         print("ERROR: Cannot fetch from domain:", domain)
225         return {}
226
227     for header in doc.find_all("h3"):
228         header_text = header.text
229
230         if header_text in translations:
231             header_text = translations[header_text]
232
233         if header_text in blocks:
234             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
235             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
236                 blocks[header_text].append(
237                     {
238                         "domain": line.find("span").text,
239                         "hash": line.find("span")["title"][9:],
240                         "reason": line.find_all("td")[1].text.strip(),
241                     }
242                 )
243
244     # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
245     return {
246         "reject": blocks["Suspended servers"],
247         "media_removal": blocks["Filtered media"],
248         "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
249     }
250
251 def get_friendica_blocks(domain: str) -> dict:
252     # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
253     blocks = []
254
255     try:
256         doc = BeautifulSoup(
257             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
258             "html.parser",
259         )
260     except:
261         print("WARNING: Failed to fetch /friendica from domain:", domain)
262         return {}
263
264     blocklist = doc.find(id="about_blocklist")
265
266     # Prevents exceptions:
267     if blocklist is None:
268         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
269         return {}
270
271     for line in blocklist.find("table").find_all("tr")[1:]:
272         blocks.append({
273             "domain": line.find_all("td")[0].text.strip(),
274             "reason": line.find_all("td")[1].text.strip()
275         })
276
277     # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
278     return {
279         "reject": blocks
280     }
281
282 def get_misskey_blocks(domain: str) -> dict:
283     # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
284     blocks = {
285         "suspended": [],
286         "blocked": []
287     }
288
289     try:
290         counter = 0
291         step = 99
292         while True:
293             # iterating through all "suspended" (follow-only in its terminology)
294             # instances page-by-page, since that troonware doesn't support
295             # sending them all at once
296             try:
297                 if counter == 0:
298                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
299                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
300                         "sort": "+caughtAt",
301                         "host": None,
302                         "suspended": True,
303                         "limit": step
304                     }))
305                 else:
306                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
307                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
308                         "sort": "+caughtAt",
309                         "host": None,
310                         "suspended": True,
311                         "limit": step,
312                         "offset": counter-1
313                     }))
314
315                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
316                 if len(doc) == 0:
317                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
318                     break
319
320                 for instance in doc:
321                     # just in case
322                     if instance["isSuspended"]:
323                         blocks["suspended"].append(
324                             {
325                                 "domain": instance["host"],
326                                 # no reason field, nothing
327                                 "reason": ""
328                             }
329                         )
330
331                 if len(doc) < step:
332                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
333                     break
334
335                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
336                 counter = counter + step
337
338             except:
339                 print("WARNING: Caught error, exiting loop:", domain)
340                 counter = 0
341                 break
342
343         while True:
344             # same shit, different asshole ("blocked" aka full suspend)
345             try:
346                 if counter == 0:
347                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
348                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
349                         "sort": "+caughtAt",
350                         "host": None,
351                         "blocked": True,
352                         "limit": step
353                     }))
354                 else:
355                     # NOISY-DEBUG: print("DEBUG: Sending JSON API request to domain,step,counter:", domain, step, counter)
356                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
357                         "sort": "+caughtAt",
358                         "host": None,
359                         "blocked": True,
360                         "limit": step,
361                         "offset": counter-1
362                     }))
363
364                 # NOISY-DEBUG: print("DEBUG: doc():", len(doc))
365                 if len(doc) == 0:
366                     # NOISY-DEBUG: print("DEBUG: Returned zero bytes, exiting loop:", domain)
367                     break
368
369                 for instance in doc:
370                     if instance["isBlocked"]:
371                         blocks["blocked"].append({
372                             "domain": instance["host"],
373                             "reason": ""
374                         })
375
376                 if len(doc) < step:
377                     # NOISY-DEBUG: print("DEBUG: End of request:", len(doc), step)
378                     break
379
380                 # NOISY-DEBUG: print("DEBUG: Raising counter by step:", step)
381                 counter = counter + step
382
383             except:
384                 counter = 0
385                 break
386
387         # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
388         return {
389             "reject": blocks["blocked"],
390             "followers_only": blocks["suspended"]
391         }
392
393     except:
394         print("WARNING: API request failed for domain:", domain)
395         return {}
396
397 def tidyup(domain: str) -> str:
398     # some retards put their blocks in variable case
399     domain = domain.lower()
400
401     # other retards put the port
402     domain = re.sub("\:\d+$", "", domain)
403
404     # bigger retards put the schema in their blocklist, sometimes even without slashes
405     domain = re.sub("^https?\:(\/*)", "", domain)
406
407     # and trailing slash
408     domain = re.sub("\/$", "", domain)
409
410     # and the @
411     domain = re.sub("^\@", "", domain)
412
413     # the biggest retards of them all try to block individual users
414     domain = re.sub("(.+)\@", "", domain)
415
416     return domain