]> git.mxchange.org Git - fba.git/blob - fetch_blocks.py
One more fork rewrite
[fba.git] / fetch_blocks.py
1 from requests import get
2 from requests import post
3 from hashlib import sha256
4 import sqlite3
5 from bs4 import BeautifulSoup
6 from json import dumps
7
8 headers = {
9     "user-agent": "Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0"
10 }
11
12
13 def get_mastodon_blocks(domain: str) -> dict:
14     blocks = {
15         "Suspended servers": [],
16         "Filtered media": [],
17         "Limited servers": [],
18         "Silenced servers": [],
19     }
20
21     translations = {
22         "Silenced instances": "Silenced servers",
23         "Suspended instances": "Suspended servers",
24         "Gesperrte Server": "Suspended servers",
25         "Gefilterte Medien": "Filtered media",
26         "Stummgeschaltete Server": "Silenced servers",
27         "停止済みのサーバー": "Suspended servers",
28         "メディアを拒否しているサーバー": "Filtered media",
29         "サイレンス済みのサーバー": "Silenced servers",
30         "Serveurs suspendus": "Suspended servers",
31         "Médias filtrés": "Filtered media",
32         "Serveurs limités": "Silenced servers",
33     }
34
35     try:
36         doc = BeautifulSoup(
37             get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
38             "html.parser",
39         )
40     except:
41         return {}
42
43     for header in doc.find_all("h3"):
44         header_text = header.text
45         if header_text in translations:
46             header_text = translations[header_text]
47         if header_text in blocks:
48             # replaced find_next_siblings with find_all_next to account for instances that e.g. hide lists in dropdown menu
49             for line in header.find_all_next("table")[0].find_all("tr")[1:]:
50                 blocks[header_text].append(
51                     {
52                         "domain": line.find("span").text,
53                         "hash": line.find("span")["title"][9:],
54                         "reason": line.find_all("td")[1].text.strip(),
55                     }
56                 )
57     return {
58         "reject": blocks["Suspended servers"],
59         "media_removal": blocks["Filtered media"],
60         "federated_timeline_removal": blocks["Limited servers"]
61         + blocks["Silenced servers"],
62     }
63
64 def get_friendica_blocks(domain: str) -> dict:
65     blocks = []
66
67     try:
68         doc = BeautifulSoup(
69             get(f"https://{domain}/friendica", headers=headers, timeout=5).text,
70             "html.parser",
71         )
72     except:
73         return {}
74
75     blocklist = doc.find(id="about_blocklist")
76     for line in blocklist.find("table").find_all("tr")[1:]:
77             blocks.append(
78                 {
79                     "domain": line.find_all("td")[0].text.strip(),
80                     "reason": line.find_all("td")[1].text.strip()
81                 }
82             )
83
84     return {
85         "reject": blocks
86     }
87
88 def get_pisskey_blocks(domain: str) -> dict:
89     blocks = {
90         "suspended": [],
91         "blocked": []
92     }
93
94     try:
95         counter = 0
96         step = 99
97         while True:
98             # iterating through all "suspended" (follow-only in its terminology) instances page-by-page, since that troonware doesn't support sending them all at once
99             try:
100                 if counter == 0:
101                     doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step}), headers=headers, timeout=5).json()
102                     if doc == []: raise
103                 else:
104                     doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"suspended":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
105                     if doc == []: raise
106                 for instance in doc:
107                     # just in case
108                     if instance["isSuspended"]:
109                         blocks["suspended"].append(
110                             {
111                                 "domain": instance["host"],
112                                 # no reason field, nothing
113                                 "reason": ""
114                             }
115                         )
116                 counter = counter + step
117             except:
118                 counter = 0
119                 break
120
121         while True:
122             # same shit, different asshole ("blocked" aka full suspend)
123             try:
124                 if counter == 0:
125                     doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step}), headers=headers, timeout=5).json()
126                     if doc == []: raise
127                 else:
128                     doc = post(f"https://{domain}/api/federation/instances", data=dumps({"sort":"+caughtAt","host":None,"blocked":True,"limit":step,"offset":counter-1}), headers=headers, timeout=5).json()
129                     if doc == []: raise
130                 for instance in doc:
131                     if instance["isBlocked"]:
132                         blocks["blocked"].append(
133                             {
134                                 "domain": instance["host"],
135                                 "reason": ""
136                             }
137                         )
138                 counter = counter + step
139             except:
140                 counter = 0
141                 break
142
143         return {
144             "reject": blocks["blocked"],
145             "followers_only": blocks["suspended"]
146         }
147
148     except:
149         return {}
150
151 def get_hash(domain: str) -> str:
152     return sha256(domain.encode("utf-8")).hexdigest()
153
154
155 def get_type(domain: str) -> str:
156     try:
157         res = get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
158         if res.status_code == 404:
159             res = get(f"https://{domain}/nodeinfo/2.0", headers=headers, timeout=5)
160         if res.status_code == 404:
161             res = get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
162         if res.ok and "text/html" in res.headers["content-type"]:
163             res = get(f"https://{domain}/nodeinfo/2.1", headers=headers, timeout=5)
164         if res.ok:
165             if res.json()["software"]["name"] in ["akkoma", "rebased"]:
166                 return "pleroma"
167             elif res.json()["software"]["name"] in ["hometown", "ecko"]:
168                 return "mastodon"
169             elif res.json()["software"]["name"] in ["calckey", "groundpolis"]:
170                 return "misskey"
171             else:
172                 return res.json()["software"]["name"]
173         elif res.status_code == 404:
174             res = get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
175         if res.ok:
176             return "mastodon"
177     except:
178         return None
179
180
181 conn = sqlite3.connect("blocks.db")
182 c = conn.cursor()
183
184 c.execute(
185     "select domain, software from instances where software in ('pleroma', 'mastodon', 'friendica', 'misskey')"
186 )
187
188 for blocker, software in c.fetchall():
189     if software == "pleroma":
190         print(blocker)
191         try:
192             # Blocks
193             federation = get(
194                 f"https://{blocker}/nodeinfo/2.1.json", headers=headers, timeout=5
195             ).json()["metadata"]["federation"]
196             if "mrf_simple" in federation:
197                 for block_level, blocks in (
198                     {**federation["mrf_simple"],
199                     **{"quarantined_instances": federation["quarantined_instances"]}}
200                 ).items():
201                     for blocked in blocks:
202                         if blocked == "":
203                             continue
204                         blocked == blocked.lower()
205                         blocker == blocker.lower()
206                         c.execute(
207                             "select domain from instances where domain = ?", (blocked,)
208                         )
209                         if c.fetchone() == None:
210                             c.execute(
211                                 "insert into instances select ?, ?, ?",
212                                 (blocked, get_hash(blocked), get_type(blocked)),
213                             )
214                         c.execute(
215                             "select * from blocks where blocker = ? and blocked = ? and block_level = ?",
216                             (blocker, blocked, block_level),
217                         )
218                         if c.fetchone() == None:
219                             c.execute(
220                                 "insert into blocks select ?, ?, '', ?",
221                                 (blocker, blocked, block_level),
222                             )
223             conn.commit()
224             # Reasons
225             if "mrf_simple_info" in federation:
226                 for block_level, info in (
227                     {**federation["mrf_simple_info"],
228                     **(federation["quarantined_instances_info"]
229                     if "quarantined_instances_info" in federation
230                     else {})}
231                 ).items():
232                     for blocked, reason in info.items():
233                         blocker == blocker.lower()
234                         blocked == blocked.lower()
235                         c.execute(
236                             "update blocks set reason = ? where blocker = ? and blocked = ? and block_level = ?",
237                             (reason["reason"], blocker, blocked, block_level),
238                         )
239             conn.commit()
240         except Exception as e:
241             print("error:", e, blocker)
242     elif software == "mastodon":
243         print(blocker)
244         try:
245             json = get_mastodon_blocks(blocker)
246             for block_level, blocks in json.items():
247                 for instance in blocks:
248                     blocked, blocked_hash, reason = instance.values()
249                     blocked == blocked.lower()
250                     blocker == blocker.lower()
251                     if blocked.count("*") <= 1:
252                         c.execute(
253                             "select hash from instances where hash = ?", (blocked_hash,)
254                         )
255                         if c.fetchone() == None:
256                             c.execute(
257                                 "insert into instances select ?, ?, ?",
258                                 (blocked, get_hash(blocked), get_type(blocked)),
259                             )
260                     c.execute(
261                         "select * from blocks where blocker = ? and blocked = ? and block_level = ?",
262                         (blocker, blocked if blocked.count("*") <= 1 else blocked_hash, block_level),
263                     )
264                     if c.fetchone() == None:
265                         c.execute(
266                             "insert into blocks select ?, ?, ?, ?",
267                             (
268                                 blocker,
269                                 blocked if blocked.count("*") <= 1 else blocked_hash,
270                                 reason,
271                                 block_level,
272                             ),
273                         )
274             conn.commit()
275         except Exception as e:
276             print("error:", e, blocker)
277     elif software == "friendica" or software == "misskey":
278         print(blocker)
279         try:
280             if software == "friendica":
281                 json = get_friendica_blocks(blocker)
282             elif software == "misskey":
283                 json = get_pisskey_blocks(blocker)
284             for block_level, blocks in json.items():
285                 for instance in blocks:
286                     blocked, reason = instance.values()
287                     blocked == blocked.lower()
288                     blocker == blocker.lower()
289                     c.execute(
290                         "select domain from instances where domain = ?", (blocked,)
291                     )
292                     if c.fetchone() == None:
293                         c.execute(
294                             "insert into instances select ?, ?, ?",
295                             (blocked, get_hash(blocked), get_type(blocked)),
296                         )
297                     c.execute(
298                         "select * from blocks where blocker = ? and blocked = ?",
299                         (blocker, blocked),
300                     )
301                     if c.fetchone() == None:
302                         c.execute(
303                             "insert into blocks select ?, ?, ?, ?",
304                             (
305                                 blocker,
306                                 blocked,
307                                 reason,
308                                 block_level,
309                             ),
310                         )
311             conn.commit()
312         except Exception as e:
313             print("error:", e, blocker)
314 conn.close()