]> git.mxchange.org Git - fba.git/blob - fba.py
Continued:
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3
4 import reqto
5 import re
6 import sqlite3
7 import json
8 import sys
9
10 with open("config.json") as f:
11     config = json.loads(f.read())
12
13 blacklist = [
14     "activitypub-troll.cf",
15     "gab.best",
16     "4chan.icu",
17     "social.shrimpcam.pw",
18     "mastotroll.netz.org"
19 ]
20
21 headers = {
22     "user-agent": config["useragent"]
23 }
24
25 conn = sqlite3.connect("blocks.db")
26 c = conn.cursor()
27
28 def get_hash(domain: str) -> str:
29     # NOISY-DEBUG: print("DEBUG: Calculating hash for domain:", domain)
30     return sha256(domain.encode("utf-8")).hexdigest()
31
32 def get_peers(domain: str) -> str:
33     # NOISY-DEBUG: print("DEBUG: Getting peers for domain:", domain)
34     peers = None
35
36     try:
37         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
38         peers = res.json()
39     except:
40         print("WARNING: Cannot fetch peers:", domain, res.status_code)
41
42     # NOISY-DEBUG: print("DEBUG: Returning peers[]:", type(peers))
43     return peers
44
45 def post_json_api(domain: str, path: str, data: str) -> str:
46     # NOISY-DEBUG: print("DEBUG: Sending POST to domain,path,data():", domain, path, len(data))
47     doc = reqto.post(f"https://{domain}{path}", data=data, headers=headers, timeout=5).json()
48
49     if doc == []:
50         print("WARNING: Cannot query JSON API:", domain, path)
51         raise
52
53     # NOISY-DEBUG: print("DEBUG: Returning doc():", len(doc))
54     return doc
55
56 def determine_software(domain: str) -> str:
57     # NOISY-DEBUG: print("DEBUG: Determining software for domain:", domain)
58     software = None
59     try:
60         res = reqto.get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
61         if res.status_code == 404:
62             res = reqto.get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
63         if res.status_code == 404:
64             res = reqto.get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
65         if res.ok and "text/html" in res.headers["content-type"]:
66             res = reqto.get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
67         if res.ok:
68             if res.json()["software"]["name"] in ["akkoma", "rebased"]:
69                 software = "pleroma"
70             elif res.json()["software"]["name"] in ["hometown", "ecko"]:
71                 software = "mastodon"
72             elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
73                 software = "misskey"
74             else:
75                 software = res.json()["software"]["name"]
76         elif res.status_code == 404:
77             res = reqto.get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
78         if res.ok:
79             software = "mastodon"
80     except:
81         print("WARNING: Failed fetching instance meta data")
82
83     # NOISY-DEBUG: print("DEBUG: Returning domain,software:", domain, software)
84     return software
85
86 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
87     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
88     try:
89         c.execute(
90             "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
91             (
92                 reason,
93                 blocker,
94                 blocked,
95                 block_level
96             ),
97         )
98
99     except:
100         print("ERROR: failed SQL query")
101         sys.exit(255)
102
103 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
104     # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
105     try:
106         c.execute(
107             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
108             (
109                 last_seen,
110                 blocker,
111                 blocked,
112                 block_level
113             )
114         )
115
116     except:
117         print("ERROR: failed SQL query")
118         sys.exit(255)
119
120 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
121     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
122     try:
123         c.execute(
124             "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
125              (
126                  blocker,
127                  blocked,
128                  reason,
129                  block_level,
130                  first_added,
131                  last_seen
132              ),
133         )
134
135     except:
136         print("ERROR: failed SQL query")
137         sys.exit(255)
138
139 def add_instance(domain: str):
140     print("--- Adding new instance:", domain)
141     try:
142         c.execute(
143             "INSERT INTO instances SELECT ?, ?, ?",
144             (
145                domain,
146                get_hash(domain),
147                determine_software(domain)
148             ),
149         )
150
151     except:
152         print("ERROR: failed SQL query")
153         sys.exit(255)
154
155 def send_bot_post(instance: str, blocks: dict):
156     message = instance + " has blocked the following instances:\n\n"
157     truncated = False
158
159     if len(blocks) > 20:
160         truncated = True
161         blocks = blocks[0 : 19]
162
163     for block in blocks:
164         if block["reason"] == None or block["reason"] == '':
165             message = message + block["blocked"] + " with unspecified reason\n"
166         else:
167             if len(block["reason"]) > 420:
168                 block["reason"] = block["reason"][0:419] + "[…]"
169
170             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
171
172     if truncated:
173         message = message + "(the list has been truncated to the first 20 entries)"
174
175     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
176
177     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
178         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
179         headers=botheaders, timeout=10).json()
180
181     return True
182
183 def get_mastodon_blocks(domain: str) -> dict:
184     # NOISY-DEBUG: print("DEBUG: Fetching mastodon blocks from domain:", domain)
185     blocks = {
186         "Suspended servers": [],
187         "Filtered media": [],
188         "Limited servers": [],
189         "Silenced servers": [],
190     }
191
192     translations = {
193         "Silenced instances": "Silenced servers",
194         "Suspended instances": "Suspended servers",
195         "Gesperrte Server": "Suspended servers",
196         "Gefilterte Medien": "Filtered media",
197         "Stummgeschaltete Server": "Silenced servers",
198         "停止済みのサーバー": "Suspended servers",
199         "メディアを拒否しているサーバー": "Filtered media",
200         "サイレンス済みのサーバー": "Silenced servers",
201         "שרתים מושעים": "Suspended servers",
202         "מדיה מסוננת": "Filtered media",
203         "שרתים מוגבלים": "Silenced servers",
204         "Serveurs suspendus": "Suspended servers",
205         "Médias filtrés": "Filtered media",
206         "Serveurs limités": "Silenced servers",
207     }
208
209     try:
210         doc = BeautifulSoup(
211             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
212             "html.parser",
213         )
214     except:
215         print("ERROR: Cannot fetch from domain:", domain)
216         return {}
217
218     for header in doc.find_all("h3"):
219         header_text = header.text
220
221         if header_text in translations:
222             header_text = translations[header_text]
223
224         if header_text in blocks:
225             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
226             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
227                 blocks[header_text].append(
228                     {
229                         "domain": line.find("span").text,
230                         "hash": line.find("span")["title"][9:],
231                         "reason": line.find_all("td")[1].text.strip(),
232                     }
233                 )
234
235     # NOISY-DEBUG: print("DEBUG: Returning blocks for domain:", domain)
236     return {
237         "reject": blocks["Suspended servers"],
238         "media_removal": blocks["Filtered media"],
239         "followers_only": blocks["Limited servers"] + blocks["Silenced servers"],
240     }
241
242 def get_friendica_blocks(domain: str) -> dict:
243     # NOISY-DEBUG: print("DEBUG: Fetching friendica blocks from domain:", domain)
244     blocks = []
245
246     try:
247         doc = BeautifulSoup(
248             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
249             "html.parser",
250         )
251     except:
252         print("WARNING: Failed to fetch /friendica from domain:", domain)
253         return {}
254
255     blocklist = doc.find(id="about_blocklist")
256
257     # Prevents exceptions:
258     if blocklist is None:
259         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
260         return {}
261
262     for line in blocklist.find("table").find_all("tr")[1:]:
263         blocks.append({
264             "domain": line.find_all("td")[0].text.strip(),
265             "reason": line.find_all("td")[1].text.strip()
266         })
267
268     # NOISY-DEBUG: print("DEBUG: Returning blocks() for domain:", domain, len(blocks))
269     return {
270         "reject": blocks
271     }
272
273 def get_misskey_blocks(domain: str) -> dict:
274     # NOISY-DEBUG: print("DEBUG: Fetching misskey blocks from domain:", domain)
275     blocks = {
276         "suspended": [],
277         "blocked": []
278     }
279
280     try:
281         counter = 0
282         step = 99
283         while True:
284             # iterating through all "suspended" (follow-only in its terminology)
285             # instances page-by-page, since that troonware doesn't support
286             # sending them all at once
287             try:
288                 if counter == 0:
289                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
290                         "sort": "+caughtAt",
291                         "host": None,
292                         "suspended": True,
293                         "limit": step
294                     }))
295                 else:
296                     doc = post_json_api(domain, "/api/federation/instances/", json.dumps({
297                         "sort": "+caughtAt",
298                         "host": None,
299                         "suspended": True,
300                         "limit": step,
301                         "offset": counter-1
302                     }))
303
304                 for instance in doc:
305                     # just in case
306                     if instance["isSuspended"]:
307                         blocks["suspended"].append(
308                             {
309                                 "domain": instance["host"],
310                                 # no reason field, nothing
311                                 "reason": ""
312                             }
313                         )
314                 counter = counter + step
315             except:
316                 counter = 0
317                 break
318
319         while True:
320             # same shit, different asshole ("blocked" aka full suspend)
321             try:
322                 if counter == 0:
323                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
324                         "sort": "+caughtAt",
325                         "host": None,
326                         "blocked": True,
327                         "limit": step
328                     }))
329                 else:
330                     doc = post_json_api(domain,"/api/federation/instances", json.dumps({
331                         "sort": "+caughtAt",
332                         "host": None,
333                         "blocked": True,
334                         "limit": step,
335                         "offset": counter-1
336                     }))
337
338                 for instance in doc:
339                     if instance["isBlocked"]:
340                         blocks["blocked"].append({
341                                 "domain": instance["host"],
342                                 "reason": ""
343                         })
344                 counter = counter + step
345
346             except:
347                 counter = 0
348                 break
349
350         # NOISY-DEBUG: print("DEBUG: Returning for domain,blocked(),suspended():", domain, len(blocks["blocked"]), len(blocks["suspended"]))
351         return {
352             "reject": blocks["blocked"],
353             "followers_only": blocks["suspended"]
354         }
355
356     except:
357         print("WARNING: API request failed for domain:", domain)
358         return {}
359
360 def tidyup(domain: str) -> str:
361     # some retards put their blocks in variable case
362     domain = domain.lower()
363
364     # other retards put the port
365     domain = re.sub("\:\d+$", "", domain)
366
367     # bigger retards put the schema in their blocklist, sometimes even without slashes
368     domain = re.sub("^https?\:(\/*)", "", domain)
369
370     # and trailing slash
371     domain = re.sub("\/$", "", domain)
372
373     # and the @
374     domain = re.sub("^\@", "", domain)
375
376     # the biggest retards of them all try to block individual users
377     domain = re.sub("(.+)\@", "", domain)
378
379     return domain