]> git.mxchange.org Git - fba.git/blob - fetch_blocks.py
187d23ece2441b7a9a51bdd67577d90f64defba4
[fba.git] / fetch_blocks.py
1 from requests import get
2 from hashlib import sha256
3 import sqlite3
4 from bs4 import BeautifulSoup
5
6 headers = {
7     "user-agent": "fedi-block-api (https://gitlab.com/EnjuAihara/fedi-block-api)"
8 }
9
10 def get_mastodon_blocks(domain: str) -> dict:
11     blocks = {
12         "Suspended servers": [],
13         "Filtered media": [],
14         "Limited servers": [],
15         "Silenced servers": [],
16     }
17
18     try:
19         doc = BeautifulSoup(
20             get(f"https://{domain}/about/more", headers=headers, timeout=5).text,
21             "html.parser",
22         )
23     except:
24         return {}
25         
26     for header in doc.find_all("h3"):
27         for line in header.find_next_siblings("table")[0].find_all("tr")[1:]:
28             if header.text in blocks:
29                 blocks[header.text].append(
30                     {
31                         "domain": line.find("span").text,
32                         "hash": line.find("span")["title"][9:],
33                         "reason": line.find_all("td")[1].text.strip(),
34                     }
35                 )
36     return {
37         "reject": blocks["Suspended servers"],
38         "media_removal": blocks["Filtered media"],
39         "federated_timeline_removal": blocks["Limited servers"] + blocks["Silenced servers"],
40     }
41
42 def get_hash(domain: str) -> str:
43     return sha256(domain.encode("utf-8")).hexdigest()
44
45 def get_type(domain: str) -> str:
46     try:
47         res = get(f"https://{domain}/nodeinfo/2.1.json", headers=headers, timeout=5)
48         if res.status_code == 404:
49             res = get(f"https://{domain}/nodeinfo/2.0.json", headers=headers, timeout=5)
50         if res.ok:
51             return res.json()["software"]["name"]
52         elif res.status_code == 404:
53             res = get(f"https://{domain}/api/v1/instance", headers=headers, timeout=5)
54         if res.ok:
55             return "mastodon"
56     except:
57         return None
58
59 conn = sqlite3.connect("blocks.db")
60 c = conn.cursor()
61
62 c.execute("select domain, software from instances where software in ('pleroma', 'mastodon')")
63
64 for blocker, software in c.fetchall():
65     if software == "pleroma":
66         print(blocker)
67         try:
68             # Blocks
69             c.execute("delete from blocks where blocker = ?", (blocker,))
70             federation = get(f"https://{blocker}/nodeinfo/2.1.json", headers=headers, timeout=5).json()["metadata"]["federation"]
71             if "mrf_simple" in federation:
72                 for block_level, blocks in (federation["mrf_simple"] | {"quarantined_instances": federation["quarantined_instances"]}).items():
73                     for blocked in blocks:
74                         if blocked == "":
75                             continue
76                         c.execute("select domain from instances where domain = ?", (blocked,))
77                         if c.fetchone() == None:
78                             c.execute("insert into instances select ?, ?, ?", (blocked, get_hash(blocked), get_type(blocked)))
79                         c.execute("insert into blocks select ?, ?, '', ?", (blocker, blocked, block_level))
80             conn.commit()
81             # Reasons
82             if "mrf_simple_info" in federation:
83                 for block_level, info in (federation["mrf_simple_info"] | federation["quarantined_instances_info"] if "quarantined_instances_info" in federation else {}).items():
84                     for blocked, reason in info.items():
85                         c.execute("update blocks set reason = ? where blocker = ? and blocked = ? and block_level = ?", (reason["reason"], blocker, blocked, block_level))
86             conn.commit()
87         except Exception as e:
88             print("error:", e, blocker)
89     elif software == "mastodon":
90         print(blocker)
91         try:
92             c.execute("delete from blocks where blocker = ?", (blocker,))
93             json = get_mastodon_blocks(blocker)
94             for block_level in json:
95                 for blocked in json[block_level]:
96                     if blocked["domain"].count("*") > 1:
97                         # instance is censored, check if domain of hash is known, if not, insert the hash
98                         c.execute("insert into blocks select ?, ifnull((select domain from instances where hash = ?), ?), ?, ?", (blocker, blocked["hash"], blocked["hash"], blocked['reason'], block_level))
99                     else:
100                         # instance is not censored
101                         c.execute("select domain from instances where domain = ?", (blocked["domain"],))
102                         if c.fetchone() == None:
103                             # if instance not known, add it
104                             c.execute("insert into instances select ?, ?, ?", (blocked["domain"], get_hash(blocked["domain"]), get_type(blocked["domain"])))
105                         c.execute("insert into blocks select ?, ?, ?, ?", (blocker, blocked["domain"], blocked["reason"], block_level))
106             conn.commit()
107         except Exception as e:
108             print("error:", e, blocker)
109 conn.close()