From d33d519d455eed2f50128f16a85dac59c2c28f99 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Sun, 13 Jul 2025 00:30:24 +0200 Subject: [PATCH] Continued: - added support for GotoSocial's HTML blocklist - added GotoSocial then also potenial blocklist source --- fba/commands.py | 11 +++- fba/helpers/tidyup.py | 5 ++ fba/networks/__init__.py | 1 + fba/networks/gotosocial.py | 118 +++++++++++++++++++++++++++++++++++++ 4 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 fba/networks/gotosocial.py diff --git a/fba/commands.py b/fba/commands.py index 5b02476..b24e4c1 100644 --- a/fba/commands.py +++ b/fba/commands.py @@ -54,6 +54,7 @@ from fba.models import obfuscation from fba.models import sources from fba.networks import friendica +from fba.networks import gotosocial from fba.networks import lemmy from fba.networks import mastodon from fba.networks import misskey @@ -301,11 +302,11 @@ def fetch_blocks(args: argparse.Namespace) -> int: elif args.only_none: # Check only entries with total_blocked=None logger.debug("Checking only entries with total_blocked=None ...") - database.cursor.execute("SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'piefed', 'typecho', 'neko', 'zuiriben') AND nodeinfo_url IS NOT NULL AND total_blocks IS NULL ORDER BY last_blocked ASC, total_blocks DESC") + database.cursor.execute("SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'piefed', 'typecho', 'neko', 'zuiriben', 'gotosocial') AND nodeinfo_url IS NOT NULL AND total_blocks IS NULL ORDER BY last_blocked ASC, total_blocks DESC") else: # Re-check after "timeout" (aka. minimum interval) logger.debug("Checking any federating software with possible blocklist ...") - database.cursor.execute("SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'piefed', 'typecho', 'neko', 'zuiriben') AND nodeinfo_url IS NOT NULL ORDER BY last_blocked ASC, total_blocks DESC") + database.cursor.execute("SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'piefed', 'typecho', 'neko', 'zuiriben', 'gotosocial') AND nodeinfo_url IS NOT NULL ORDER BY last_blocked ASC, total_blocks DESC") # Load all rows rows = [dict(row) for row in database.cursor.fetchall()] @@ -352,6 +353,9 @@ def fetch_blocks(args: argparse.Namespace) -> int: elif row["software"] == "friendica": blocking = friendica.fetch_blocks(row["domain"]) logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"]) + elif row["software"] == "gotosocial": + blocking = gotosocial.fetch_blocks(row["domain"]) + logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"]) elif row["software"] == "misskey": blocking = misskey.fetch_blocks(row["domain"]) logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"]) @@ -1398,6 +1402,9 @@ def recheck_obfuscation(args: argparse.Namespace) -> int: elif row["software"] == "friendica": logger.debug("Fetching blocks from row[domain]='%s',row[software]='%s' ...", row["domain"], row["software"]) blocking = friendica.fetch_blocks(row["domain"]) + elif row["software"] == "gotosocial": + logger.debug("Fetching blocks from row[domain]='%s',row[software]='%s' ...", row["domain"], row["software"]) + blocking = gotosocial.fetch_blocks(row["domain"]) elif row["software"] == "misskey": logger.debug("Fetching blocks from row[domain]='%s',row[software]='%s' ...", row["domain"], row["software"]) blocking = misskey.fetch_blocks(row["domain"]) diff --git a/fba/helpers/tidyup.py b/fba/helpers/tidyup.py index 02932ca..1593e46 100644 --- a/fba/helpers/tidyup.py +++ b/fba/helpers/tidyup.py @@ -30,6 +30,11 @@ def reason(string: str) -> str: # Strip string string = string.strip() + # may come from GotoSocial + logger.debug("string='%s' - #1", string) + if string == "": + string = None + logger.debug("string='%s' - EXIT!", string) return string diff --git a/fba/networks/__init__.py b/fba/networks/__init__.py index 4156fac..2352f78 100644 --- a/fba/networks/__init__.py +++ b/fba/networks/__init__.py @@ -16,6 +16,7 @@ __all__ = [ 'friendica', + 'gotosocial', 'lemmy', 'mastodon', 'misskey', diff --git a/fba/networks/gotosocial.py b/fba/networks/gotosocial.py new file mode 100644 index 0000000..6b7d5ec --- /dev/null +++ b/fba/networks/gotosocial.py @@ -0,0 +1,118 @@ +# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes +# Copyright (C) 2023 Free Software Foundation +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import logging + +import bs4 + +from fba.helpers import blacklist +from fba.helpers import domain as domain_helper +from fba.helpers import tidyup + +from fba.http import network + +from fba.models import instances + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) +#logger.setLevel(logging.DEBUG) + +def fetch_blocks(domain: str) -> list: + logger.debug("domain='%s' - CALLED!", domain) + domain_helper.raise_on(domain) + + if blacklist.is_blacklisted(domain): + raise RuntimeError(f"domain='{domain}' is blacklisted but function was invoked") + elif not instances.is_registered(domain): + raise RuntimeError(f"domain='{domain}' is not registered but function was invoked") + + blocklist = [] + block_tag = None + + try: + logger.debug("Fetching GotoSocial blocks from domain='%s' ...", domain) + raw = network.get_generic( + domain, + "/about/suspended" + ).text + logger.debug("Parsing %d Bytes ...", len(raw)) + + doc = bs4.BeautifulSoup(raw, features="html.parser") + logger.debug("doc[]='%s'", type(doc)) + + block_tag = doc.find("section") + logger.debug("block_tag[%s]='%s'", type(block_tag), block_tag) + except network.exceptions as exception: + logger.warning("Exception '%s' during fetching instances from domain='%s'", type(exception), domain) + instances.set_last_error(domain, exception) + + logger.debug("Returning empty list ... - EXIT!") + return [] + + logger.debug("block_tag[%s]='%s'", type(block_tag), block_tag) + if block_tag is None: + logger.debug("Instance has no block list: domain='%s' - EXIT!", domain) + return [] + + # Init local variables + rows = () + + # Try to find table + table = block_tag.find("div", {"class": "domain-blocklist"}) + + logger.debug("table[]='%s'", type(table)) + if table is None: + logger.warning("domain='%s' has no table tag - EXIT !", domain) + return [] + + # Find all rows in table + rows = table.find_all("div", {"class": "entry"}) + + logger.debug("Found rows[%s]()=%d", type(rows), len(rows)) + for line in rows: + logger.debug("line[%s]='%s'", type(line), line) + if line.find({"class": "header"}): + logger.debug("line='%s' is a header - SKIPPED!", line) + continue + + blocked = line.find("div", {"class": "domain"}) + reason = line.find("div", {"class": "public_comment"}) + if blocked is None: + logger.warning("domain='%s' has a blocklist entry with an empty blocked domain! line='%s' - SKIPPED!", domain, line) + continue + + logger.debug("blocked='%s',reason='%s' - BEFORE!", blocked, reason) + blocked = tidyup.domain(blocked.text) + reason = tidyup.reason(reason.text) if reason is not None else None + logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason) + + if blocked in [None, ""]: + logger.warning("line[]='%s' returned empty blocked domain - SKIPPED!", type(line)) + continue + elif not domain_helper.is_wanted(blocked): + logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) + continue + + logger.debug("Appending blocked='%s',reason='%s'", blocked, reason) + blocklist.append({ + "blocker" : domain, + "blocked" : blocked, + "reason" : reason, + "block_level": "rejected", + }) + + logger.debug("blocklist()=%d - EXIT!", len(blocklist)) + return blocklist -- 2.39.5