]> git.mxchange.org Git - fba.git/blob - fetch_blocks.py
7a3de5f15f910792c6517b4d398ff0e2667370ab
[fba.git] / fetch_blocks.py
1 from requests import get
2 from hashlib import sha256
3 import sqlite3
4 from bs4 import BeautifulSoup
5
6 headers = {
7     "user-agent": "fedi-block-api (https://gitlab.com/EnjuAihara/fedi-block-api)"
8 }
9
10 def get_mastodon_blocks(domain: str) -> dict:
11     blocks = {
12         "Suspended servers": [],
13         "Filtered media": [],
14         "Limited servers": [],
15         "Silenced servers": [],
16     }
17
18     try:
19         doc = BeautifulSoup(
20             get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
21             "html.parser",
22         )
23     except:
24         return {}
25         
26     for header in doc.find_all("h3"):
27         for line in header.find_next_siblings("table")[0].find_all("tr")[1:]:
28             if header.text in blocks:
29                 blocks[header.text].append(
30                     {
31                         "domain": line.find("span").text,
32                         "hash": line.find("span")["title"][9:],
33                         "reason": line.find_all("td")[1].text.strip(),
34                     }
35                 )
36     return {
37         "reject": blocks["Suspended servers"],
38         "media_removal": blocks["Filtered media"],
39         "federated_timeline_removal": blocks["Limited servers"] + blocks["Silenced servers"],
40     }
41
42
43 def get_type(domain: str) -> str:
44     try:
45         res = get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
46         if res.status_code == 404:
47             res = get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
48         if res.ok:
49             return res.json()["software"]["name"]
50         elif res.status_code == 404:
51             res = get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
52         if res.ok:
53             return "mastodon"
54     except:
55         return None
56
57 conn = sqlite3.connect("blocks.db")
58 c = conn.cursor()
59
60 c.execute("select domain, software from instances where software in ('pleroma', 'mastodon')")
61
62 for blocker, software in c.fetchall():
63     if software == "pleroma":
64         print(blocker)
65         try:
66             # Blocks
67             c.execute("delete from blocks where blocker = ?", (blocker,))
68             json = get(f"https://{blocker}/nodeinfo/2.1.json", headers=headers, timeout=5).json()
69             if "mrf_simple" in json["metadata"]["federation"]:
70                 for mrf in json["metadata"]["federation"]["mrf_simple"]:
71                     for blocked in json["metadata"]["federation"]["mrf_simple"][mrf]:
72                         if blocked == "":
73                             continue
74                         c.execute("select domain from instances where domain = ?", (blocked,))
75                         if c.fetchone() == None:
76                             c.execute("insert into instances select ?, ?, ?", (blocked, sha256(bytes(blocked, "utf-8")).hexdigest(), get_type(blocked)))
77                         c.execute("insert into blocks select ?, ?, '', ?", (blocker, blocked, mrf))
78             # Quarantined Instances
79             if "quarantined_instances" in json["metadata"]["federation"]:
80                 for blocked in json["metadata"]["federation"]["quarantined_instances"]:
81                     if blocked == "":
82                         continue
83                     c.execute("select domain from instances where domain = ?", (blocked,))
84                     if c.fetchone() == None:
85                         c.execute("insert into instances select ?, ?, ?", (blocked, sha256(bytes(blocked, "utf-8")).hexdigest(), get_type(blocked)))
86                     c.execute("insert into blocks select ?, ?, '', 'quarantined_instances'", (blocker, blocked))
87             conn.commit()
88             # Reasons
89             if "mrf_simple_info" in json["metadata"]["federation"]:
90                 for mrf in json["metadata"]["federation"]["mrf_simple_info"]:
91                     for blocked in json["metadata"]["federation"]["mrf_simple_info"][mrf]:
92                         c.execute("update blocks set reason = ? where blocker = ? and blocked = ? and block_level = ?", (json["metadata"]["federation"]["mrf_simple_info"][mrf][blocked]["reason"], blocker, blocked, mrf))
93             if "quarantined_instances_info" in json["metadata"]["federation"]:
94                 for blocked in json["metadata"]["federation"]["quarantined_instances_info"]["quarantined_instances"]:
95                     c.execute("update blocks set reason = ? where blocker = ? and blocked = ? and block_level = 'quarantined_instances'", (json["metadata"]["federation"]["quarantined_instances_info"]["quarantined_instances"][blocked]["reason"], blocker, blocked))
96             conn.commit()
97         except Exception as e:
98             print("error:", e, blocker)
99     elif software == "mastodon":
100         print(blocker)
101         try:
102             c.execute("delete from blocks where blocker = ?", (blocker,))
103             json = get_mastodon_blocks(blocker)
104             for block_level in json:
105                 for blocked in json[block_level]:
106                     if blocked["domain"].count("*") > 1:
107                         # instance is censored, check if domain of hash is known, if not, insert the hash
108                         c.execute("insert into blocks select ?, ifnull((select domain from instances where hash = ?), ?), ?, ?", (blocker, blocked["hash"], blocked["hash"], blocked['reason'], block_level))
109                     else:
110                         # instance is not censored
111                         c.execute("select domain from instances where domain = ?", (blocked["domain"],))
112                         if c.fetchone() == None:
113                             # if instance not known, add it
114                             c.execute("insert into instances select ?, ?, ?", (blocked["domain"], sha256(bytes(blocked["domain"], "utf-8")).hexdigest(), get_type(blocked["domain"])))
115                         c.execute("insert into blocks select ?, ?, ?, ?", (blocker, blocked["domain"], blocked["reason"], block_level))
116             conn.commit()
117         except Exception as e:
118             print("error:", e, blocker)
119 conn.close()