except:
return None
+def tidyup(domain: str) -> str:
+ # some retards put their blocks in variable case
+ domain = domain.lower()
+ # other retards put the port
+ domain = re.sub("\:\d+$", "", domain)
+ # bigger retards put the schema in their blocklist, sometimes even without slashes
+ domain = re.sub("^https?\:(\/*)", "", domain)
+ # the biggest retards of them all try to block individual users
+ domain = re.sub("(.+)\@", "", domain)
+ return domain
conn = sqlite3.connect("blocks.db")
c = conn.cursor()
)
for blocker, software in c.fetchall():
+ blocker = tidyup(blocker)
if software == "pleroma":
print(blocker)
try:
**{"quarantined_instances": federation["quarantined_instances"]}}
).items():
for blocked in blocks:
+ blocked = tidyup(blocked)
if blocked == "":
continue
- blocked = blocked.lower()
- blocker = blocker.lower()
+ if blocked.count("*") > 1:
+ # -ACK!-oma also started obscuring domains without hash
+ c.execute(
+ "select domain from instances where domain like ? order by rowid limit 1", (blocked.replace("*", "_"),)
+ )
+ searchres = c.fetchone()
+ if searchres != None:
+ blocked = searchres[0]
+
c.execute(
"select domain from instances where domain = ?", (blocked,)
)
else {})}
).items():
for blocked, reason in info.items():
- blocker = blocker.lower()
- blocked = blocked.lower()
+ blocked = tidyup(blocked)
c.execute(
"update blocks set reason = ? where blocker = ? and blocked = ? and block_level = ?",
(reason["reason"], blocker, blocked, block_level),
for block_level, blocks in json.items():
for instance in blocks:
blocked, blocked_hash, reason = instance.values()
- blocked = blocked.lower()
- blocker = blocker.lower()
+ blocked = tidyup(blocked)
if blocked.count("*") <= 1:
c.execute(
"select hash from instances where hash = ?", (blocked_hash,)
"insert into instances select ?, ?, ?",
(blocked, get_hash(blocked), get_type(blocked)),
)
+ else:
+ # Doing the hash search for instance names as well to tidy up DB
+ c.execute(
+ "select domain from instances where hash = ?", (blocked_hash,)
+ )
+ searchres = c.fetchone()
+ if searchres != None:
+ blocked = searchres[0]
+
c.execute(
"select * from blocks where blocker = ? and blocked = ? and block_level = ?",
(blocker, blocked if blocked.count("*") <= 1 else blocked_hash, block_level),
for block_level, blocks in json.items():
for instance in blocks:
blocked, reason = instance.values()
- blocked = blocked.lower()
- blocker = blocker.lower()
+ blocked = tidyup(blocked)
c.execute(
"select domain from instances where domain = ?", (blocked,)
)
).json()
for peer in federation:
blocked = peer["domain"].lower()
- blocker = blocker.lower()
if blocked.count("*") > 0:
# GTS does not have hashes for obscured domains, so we have to guess it