]> git.mxchange.org Git - fba.git/blob - fba.py
188fb70e0b76aaf8d993df56019c3a3cb0d3f3d5
[fba.git] / fba.py
1 from bs4 import BeautifulSoup
2 from hashlib import sha256
3 import reqto
4 import re
5 import sqlite3
6 import json
7 import sys
8
9 with open("config.json") as f:
10     config = json.loads(f.read())
11
12 blacklist = [
13     "activitypub-troll.cf",
14     "gab.best",
15     "4chan.icu",
16     "social.shrimpcam.pw",
17     "mastotroll.netz.org"
18 ]
19
20 headers = {
21     "user-agent": config["useragent"]
22 }
23
24 conn = sqlite3.connect("blocks.db")
25 c = conn.cursor()
26
27 def get_hash(domain: str) -> str:
28     return sha256(domain.encode("utf-8")).hexdigest()
29
30 def get_peers(domain: str) -> str:
31     try:
32         res = reqto.get(f"https://{domain}/api/v1/instance/peers", headers=headers, timeout=5)
33         return res.json()
34     except:
35         print("WARNING: Cannot fetch peers:", domain)
36         return None
37
38 def get_type(instdomain: str) -> str:
39     try:
40         res = reqto.get(f"https://{instdomain}/nodeinfo/2.1.json", headers=headers, timeout=5)
41         if res.status_code == 404:
42             res = reqto.get(f"https://{instdomain}/nodeinfo/2.0", headers=headers, timeout=5)
43         if res.status_code == 404:
44             res = reqto.get(f"https://{instdomain}/nodeinfo/2.0.json", headers=headers, timeout=5)
45         if res.ok and "text/html" in res.headers["content-type"]:
46             res = reqto.get(f"https://{instdomain}/nodeinfo/2.1", headers=headers, timeout=5)
47         if res.ok:
48             if res.json()["software"]["name"] in ["akkoma", "rebased"]:
49                 return "pleroma"
50             elif res.json()["software"]["name"] in ["hometown", "ecko"]:
51                 return "mastodon"
52             elif res.json()["software"]["name"] in ["calckey", "groundpolis", "foundkey", "cherrypick"]:
53                 return "misskey"
54             else:
55                 return res.json()["software"]["name"]
56         elif res.status_code == 404:
57             res = reqto.get(f"https://{instdomain}/api/v1/instance", headers=headers, timeout=5)
58         if res.ok:
59             return "mastodon"
60     except:
61         return None
62
63 def update_block_reason(reason: str, blocker: str, blocked: str, block_level: str):
64     # NOISY: print("--- Updating block reason:", reason, blocker, blocked, block_level)
65     try:
66         c.execute(
67             "UPDATE blocks SET reason = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason = ''",
68             (
69                 reason,
70                 blocker,
71                 blocked,
72                 block_level
73             ),
74         )
75     except:
76         print("ERROR: failed SQL query")
77         sys.exit(255)
78
79 def update_last_seen(last_seen: int, blocker: str, blocked: str, block_level: str):
80     # NOISY: print("--- Updating last_seen:", last_seen, blocker, blocked, block_level)
81     try:
82         c.execute(
83             "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ?",
84             (
85                 last_seen,
86                 blocker,
87                 blocked,
88                 block_level
89             )
90         )
91     except:
92         print("ERROR: failed SQL query")
93         sys.exit(255)
94
95 def block_instance(blocker: str, blocked: str, reason: str, block_level: str, first_added: int, last_seen: int):
96     print("--- New block:", blocker, blocked, reason, block_level, first_added, last_seen)
97     try:
98         c.execute(
99             "INSERT INTO blocks SELECT ?, ?, ?, ?, ?, ?",
100              (
101                  blocker,
102                  blocked,
103                  reason,
104                  block_level,
105                  first_added,
106                  last_seen
107              ),
108         )
109     except:
110         print("ERROR: failed SQL query")
111         sys.exit(255)
112
113 def add_instance(domain: str):
114     print("--- Adding new instance:", domain)
115     try:
116         c.execute(
117             "INSERT INTO instances SELECT ?, ?, ?",
118             (
119                domain,
120                get_hash(domain),
121                get_type(domain)
122             ),
123         )
124     except:
125         print("ERROR: failed SQL query")
126         sys.exit(255)
127
128 def send_bot_post(instance: str, blocks: dict):
129     message = instance + " has blocked the following instances:\n\n"
130     truncated = False
131
132     if len(blocks) > 20:
133         truncated = True
134         blocks = blocks[0 : 19]
135
136     for block in blocks:
137         if block["reason"] == None or block["reason"] == '':
138             message = message + block["blocked"] + " with unspecified reason\n"
139         else:
140             if len(block["reason"]) > 420:
141                 block["reason"] = block["reason"][0:419] + "[…]"
142             message = message + block["blocked"] + ' for "' + block["reason"].replace("@", "@\u200b") + '"\n'
143     if truncated:
144         message = message + "(the list has been truncated to the first 20 entries)"
145
146     botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
147     req = reqto.post(f"{config['bot_instance']}/api/v1/statuses",
148         data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
149         headers=botheaders, timeout=10).json()
150     return True
151
152 def get_mastodon_blocks(domain: str) -> dict:
153     blocks = {
154         "Suspended servers": [],
155         "Filtered media": [],
156         "Limited servers": [],
157         "Silenced servers": [],
158     }
159
160     translations = {
161         "Silenced instances": "Silenced servers",
162         "Suspended instances": "Suspended servers",
163         "Gesperrte Server": "Suspended servers",
164         "Gefilterte Medien": "Filtered media",
165         "Stummgeschaltete Server": "Silenced servers",
166         "停止済みのサーバー": "Suspended servers",
167         "メディアを拒否しているサーバー": "Filtered media",
168         "サイレンス済みのサーバー": "Silenced servers",
169         "שרתים מושעים": "Suspended servers",
170         "מדיה מסוננת": "Filtered media",
171         "שרתים מוגבלים": "Silenced servers",
172         "Serveurs suspendus": "Suspended servers",
173         "Médias filtrés": "Filtered media",
174         "Serveurs limités": "Silenced servers",
175     }
176
177     try:
178         doc = BeautifulSoup(
179             reqto.get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
180             "html.parser",
181         )
182     except:
183         print("ERROR: Cannot fetch from domain:", domain)
184         return {}
185
186     for header in doc.find_all("h3"):
187         header_text = header.text
188         if header_text in translations:
189             header_text = translations[header_text]
190         if header_text in blocks:
191             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
192             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
193                 blocks[header_text].append(
194                     {
195                         "domain": line.find("span").text,
196                         "hash": line.find("span")["title"][9:],
197                         "reason": line.find_all("td")[1].text.strip(),
198                     }
199                 )
200     return {
201         "reject": blocks["Suspended servers"],
202         "media_removal": blocks["Filtered media"],
203         "followers_only": blocks["Limited servers"]
204         + blocks["Silenced servers"],
205     }
206
207 def get_friendica_blocks(domain: str) -> dict:
208     blocks = []
209
210     try:
211         doc = BeautifulSoup(
212             reqto.get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
213             "html.parser",
214         )
215     except:
216         return {}
217
218     blocklist = doc.find(id="about_blocklist")
219
220     # Prevents exceptions:
221     if blocklist is None:
222         # NOISY-DEBUG: print("DEBUG: Instance has no block list:", domain)
223         return {}
224
225     for line in blocklist.find("table").find_all("tr")[1:]:
226         blocks.append(
227             {
228                 "domain": line.find_all("td")[0].text.strip(),
229                 "reason": line.find_all("td")[1].text.strip()
230             }
231         )
232
233     return {
234         "reject": blocks
235     }
236
237 def get_misskey_blocks(domain: str) -> dict:
238     blocks = {
239         "suspended": [],
240         "blocked": []
241     }
242
243     try:
244         counter = 0
245         step = 99
246         while True:
247             # iterating through all "suspended" (follow-only in its terminology) instances page-by-page, since that troonware doesn't support sending them all at once
248             try:
249                 if counter == 0:
250                     doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step}), headers=headers, timeout=5).json()
251                     if doc == []: raise
252                 else:
253                     doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
254                     if doc == []: raise
255                 for instance in doc:
256                     # just in case
257                     if instance["isSuspended"]:
258                         blocks["suspended"].append(
259                             {
260                                 "domain": instance["host"],
261                                 # no reason field, nothing
262                                 "reason": ""
263                             }
264                         )
265                 counter = counter + step
266             except:
267                 counter = 0
268                 break
269
270         while True:
271             # same shit, different asshole ("blocked" aka full suspend)
272             try:
273                 if counter == 0:
274                     doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step}), headers=headers, timeout=5).json()
275                     if doc == []: raise
276                 else:
277                     doc = reqto.post(f"https://{domain}/api/federation/instances", data=json.dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
278                     if doc == []: raise
279                 for instance in doc:
280                     if instance["isBlocked"]:
281                         blocks["blocked"].append(
282                             {
283                                 "domain": instance["host"],
284                                 "reason": ""
285                             }
286                         )
287                 counter = counter + step
288             except:
289                 counter = 0
290                 break
291
292         return {
293             "reject": blocks["blocked"],
294             "followers_only": blocks["suspended"]
295         }
296
297     except:
298         return {}
299
300 def tidyup(domain: str) -> str:
301     # some retards put their blocks in variable case
302     domain = domain.lower()
303
304     # other retards put the port
305     domain = re.sub("\:\d+$", "", domain)
306
307     # bigger retards put the schema in their blocklist, sometimes even without slashes
308     domain = re.sub("^https?\:(\/*)", "", domain)
309
310     # and trailing slash
311     domain = re.sub("\/$", "", domain)
312
313     # and the @
314     domain = re.sub("^\@", "", domain)
315
316     # the biggest retards of them all try to block individual users
317     domain = re.sub("(.+)\@", "", domain)
318
319     return domain