time.time()
),
)
+
+ connection.commit()
except BaseException as e:
print(f"ERROR: failed SQL query: blocker='{blocker}',blocked='{blocked}',reason='{reason}',block_level='{block_level}',exception[{type(e)}]:'{str(e)}'")
sys.exit(255)
# DEBUG: print(f"DEBUG: string='{string}' - EXIT!")
return string
+
+def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool:
+ # DEBUG: print(f"DEBUG: blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
+ if type(blocker) != str:
+ raise ValueError(f"Parameter blocker[]={type(blocker)} is not of type 'str'")
+ elif blocker == "":
+ raise ValueError("Parameter 'blocker' cannot be empty")
+ elif type(blocked) != str:
+ raise ValueError(f"Parameter blocked[]={type(blocked)} is not of type 'str'")
+ elif blocked == "":
+ raise ValueError("Parameter 'blocked' cannot be empty")
+ elif type(block_level) != str:
+ raise ValueError(f"Parameter block_level[]={type(block_level)} is not of type 'str'")
+ elif block_level == "":
+ raise ValueError("Parameter 'block_level' cannot be empty")
+
+ cursor.execute(
+ "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
+ (
+ blocker,
+ blocked,
+ block_level
+ ),
+ )
+
+ is_blocked = cursor.fetchone() != None
+
+ # DEBUG: print(f"DEBUG: is_blocked='{is_blocked}' - EXIT!")
+ return is_blocked
# DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
fba.add_instance(blocked, blocker, sys.argv[0], nodeinfo_url)
- fba.cursor.execute(
- "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
- (
- blocker,
- blocked,
- block_level
- ),
- )
-
- if fba.cursor.fetchone() == None:
+ if not fba.is_instance_blocked(blocker, blocked, block_level):
# DEBUG: print("DEBUG: Blocking:", blocker, blocked, block_level)
fba.block_instance(blocker, blocked, "unknown", block_level)
blocking = blocked if blocked.count("*") <= 1 else blocked_hash
# DEBUG: print(f"DEBUG: blocking='{blocking}',blocked='{blocked}',blocked_hash='{blocked_hash}'")
- fba.cursor.execute(
- "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
- (
- blocker,
- blocking,
- block_level
- ),
- )
-
- if fba.cursor.fetchone() == None:
+ if not fba.is_instance_blocked(blocker, blocked, block_level):
# DEBUG: print("DEBUG: Blocking:", blocker, blocked, block_level)
fba.block_instance(blocker, blocking, reason, block_level)
# DEBUG: print("DEBUG: Hash wasn't found, adding:", blocked, blocker)
fba.add_instance(blocked, blocker, sys.argv[0], nodeinfo_url)
- fba.cursor.execute(
- "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
- (blocker, blocked, block_level),
- )
-
- if fba.cursor.fetchone() == None:
+ if not fba.is_instance_blocked(blocker, blocked, block_level):
fba.block_instance(blocker, blocked, reason, block_level)
if block_level == "reject":
# DEBUG: print(f"DEBUG: Domain blocked='{blocked}' wasn't found, adding ..., blocker='{blocker}',origin='{origin}',nodeinfo_url='{nodeinfo_url}'")
fba.add_instance(blocked, blocker, sys.argv[0], nodeinfo_url)
- fba.cursor.execute(
- "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
- (
- blocker,
- blocked,
- "reject"
- ),
- )
-
- if fba.cursor.fetchone() == None:
+ if not fba.is_instance_blocked(blocker, blocked, "reject"):
# DEBUG: print(f"DEBUG: blocker='{blocker}' is blocking '{blocked}' for unknown reason at this point")
fba.block_instance(blocker, blocked, "unknown", "reject")
--- /dev/null
+#!/usr/bin/python3
+# -*- coding: utf-8 -*-
+
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import bs4
+import json
+import reqto
+import sys
+import validators
+from fba import *
+
+def find_domains(tag: bs4.element.Tag) -> list:
+ # DEBUG: print(f"DEBUG: tag[]={type(tag)} - CALLED!")
+ if not isinstance(tag, bs4.element.Tag):
+ raise ValueError(f"Parameter tag[]={type(tag)} is not type of bs4.element.Tag")
+ elif not isinstance(tag, bs4.element.Tag):
+ raise KeyError("Cannot find table with instances!")
+ elif len(tag.select("tr")) == 0:
+ raise KeyError("No table rows found in table!")
+
+ domains = list()
+ for element in tag.select("tr"):
+ # DEBUG: print(f"DEBUG: element[]={type(element)}")
+ if not element.find("td"):
+ # DEBUG: print("DEBUG: Skipping element, no <td> found")
+ continue
+
+ domain = fba.tidyup(element.find("td").text)
+ reason = element.findAll("td")[1].text
+
+ # DEBUG: print(f"DEBUG: domain='{domain}',reason='{reason}'")
+
+ if fba.is_blacklisted(domain):
+ print(f"WARNING: domain='{domain}' is blacklisted - skipped!")
+ continue
+ elif not validators.domain(domain):
+ print(f"WARNING: domain='{domain}' is not a valid domain - skipped!")
+ continue
+
+ # DEBUG: print(f"DEBUG: Adding domain='{domain}' ...")
+ domains.append({
+ "domain": domain,
+ "reason": reason,
+ })
+
+ # DEBUG: print(f"DEBUG: domains()={len(domains)} - EXIT!")
+ return domains
+
+boot.acquire_lock()
+
+domains = {
+ "silenced": list(),
+ "blocked": list(),
+}
+
+try:
+ doc = bs4.BeautifulSoup(
+ reqto.get("https://meta.chaos.social/federation", headers=fba.headers, timeout=(fba.config["connection_timeout"], fba.config["read_timeout"])).text,
+ "html.parser",
+ )
+ # DEBUG: print(f"DEBUG: doc()={len(doc)}[]={type(doc)}")
+ silenced = doc.find("h2", {"id": "silenced-instances"}).findNext("table")
+
+ # DEBUG: print(f"DEBUG: silenced[]={type(silenced)}")
+ domains["silenced"] = domains["silenced"] + find_domains(silenced)
+ blocked = doc.find("h2", {"id": "blocked-instances"}).findNext("table")
+
+ # DEBUG: print(f"DEBUG: blocked[]={type(blocked)}")
+ domains["blocked"] = domains["blocked"] + find_domains(blocked)
+
+except BaseException as e:
+ print(f"ERROR: Cannot fetch from meta.chaos.social,exception[{type(e)}]:'{str(e)}'")
+ sys.exit(255)
+
+# Show domains
+# DEBUG: print(f"DEBUG: domains()={len(domains)}")
+if len(domains) > 0:
+ print(f"INFO: Adding {len(domains)} new instances ...")
+ for block_level in domains:
+ # DEBUG: print(f"DEBUG: block_level='{block_level}'")
+
+ for row in domains[block_level]:
+ # DEBUG: print(f"DEBUG: row='{row}'")
+ if not fba.is_instance_registered(row["domain"]):
+ print(f"INFO: Fetching instances from domain='{row['domain']}' ...")
+ fba.fetch_instances(row["domain"], None, None, sys.argv[0])
+
+ if not fba.is_instance_blocked('chaos.social', row["domain"], block_level):
+ # DEBUG: print(f"DEBUG: domain='{row['domain']}',block_level='{block_level}' blocked by chaos.social, adding ...")
+ fba.block_instance('chaos.social', row["domain"], row["reason"], block_level)
+
+boot.shutdown()