-from requests import get
-from requests import post
+from reqto import get
+from reqto import post
from hashlib import sha256
import sqlite3
from bs4 import BeautifulSoup
from json import loads
import re
from time import time
+import itertools
with open("config.json") as f:
config = loads(f.read())
"user-agent": config["useragent"]
}
+def send_bot_post(instance: str, blocks: dict):
+ message = instance + " has blocked the following instances:\n\n"
+ truncated = False
+ if len(blocks) > 20:
+ truncated = True
+ blocks = blocks[0 : 19]
+ for block in blocks:
+ if block["reason"] == None or block["reason"] == '':
+ message = message + block["blocked"] + " with unspecified reason\n"
+ else:
+ message = message + block["blocked"] + ' for "' + block["reason"] + '"\n'
+ if truncated:
+ message = message + "(the list has been truncated to the first 20 entries)"
+
+ botheaders = {**headers, **{"Authorization": "Bearer " + config["bot_token"]}}
+ req = post(f"{config['bot_instance']}/api/v1/statuses",
+ data={"status":message, "visibility":config['bot_visibility'], "content_type":"text/plain"},
+ headers=botheaders, timeout=10).json()
+ print(req)
+ return True
def get_mastodon_blocks(domain: str) -> dict:
blocks = {
c = conn.cursor()
c.execute(
- "select domain, software from instances where software in ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial')"
+ #"select domain, software from instances where software in ('pleroma', 'mastodon', 'friendica', 'misskey', 'gotosocial')"
+ "select domain, software from instances where domain = 'mstdn.social'"
)
for blocker, software in c.fetchall():
+ blockdict = []
blocker = tidyup(blocker)
if software == "pleroma":
print(blocker)
"insert into blocks select ?, ?, '', ?, ?, ?",
(blocker, blocked, block_level, timestamp, timestamp),
)
+ if block_level == "reject":
+ blockdict.append(
+ {
+ "blocked": blocked,
+ "reason": None
+ })
else:
c.execute(
"update blocks set last_seen = ? where blocker = ? and blocked = ? and block_level = ?",
"update blocks set reason = ? where blocker = ? and blocked = ? and block_level = ? and reason = ''",
(reason["reason"], blocker, blocked, block_level),
)
+ for entry in blockdict:
+ if entry["blocked"] == blocked:
+ entry["reason"] = reason["reason"]
+
conn.commit()
except Exception as e:
print("error:", e, blocker)
"followers_only": [],
"report_removal": []
}
+
+ # handling CSRF, I've saw at least one server requiring it to access the endpoint
+ meta = BeautifulSoup(
+ get(f"https://{blocker}/about", headers=headers, timeout=5).text,
+ "html.parser",
+ )
+ try:
+ csrf = meta.find("meta", attrs={"name": "csrf-token"})["content"]
+ reqheaders = {**headers, **{"x-csrf-token": csrf}}
+ except:
+ reqheaders = headers
+
blocks = get(
- f"https://{blocker}/api/v1/instance/domain_blocks", headers=headers, timeout=5
+ f"https://{blocker}/api/v1/instance/domain_blocks", headers=reqheaders, timeout=5
).json()
for block in blocks:
entry = {'domain': block['domain'], 'hash': block['digest'], 'reason': block['comment']}
timestamp,
),
)
+ if block_level == "reject":
+ blockdict.append(
+ {
+ "blocked": blocked,
+ "reason": reason
+ })
else:
c.execute(
"update blocks set last_seen = ? where blocker = ? and blocked = ? and block_level = ?",
timestamp = int(time())
c.execute(
- "select * from blocks where blocker = ? and blocked = ? and reason = ?",
- (blocker, blocked, reason),
+ "select * from blocks where blocker = ? and blocked = ?",
+ (blocker, blocked),
)
if c.fetchone() == None:
c.execute(
timestamp
),
)
+ if block_level == "reject":
+ blockdict.append(
+ {
+ "blocked": blocked,
+ "reason": reason
+ })
else:
c.execute(
"update blocks set last_seen = ? where blocker = ? and blocked = ? and block_level = ?",
"insert into blocks select ?, ?, ?, ?, ?, ?",
(blocker, blocked, "", "reject", timestamp, timestamp),
)
+ blockdict.append(
+ {
+ "blocked": blocked,
+ "reason": None
+ })
else:
c.execute(
"update blocks set last_seen = ? where blocker = ? and blocked = ? and block_level = ?",
"update blocks set reason = ? where blocker = ? and blocked = ? and block_level = ? and reason = ''",
(reason, blocker, blocked, "reject"),
)
+ for entry in blockdict:
+ if entry["blocked"] == blocked:
+ entry["reason"] = reason
conn.commit()
except Exception as e:
print("error:", e, blocker)
+
+ if config["bot_enabled"] and len(blockdict) > 0:
+ send_bot_post(blocker, blockdict)
+ blockdict = []
+
conn.close()