from fba.models import sources
from fba.networks import friendica
+from fba.networks import gotosocial
from fba.networks import lemmy
from fba.networks import mastodon
from fba.networks import misskey
elif args.only_none:
# Check only entries with total_blocked=None
logger.debug("Checking only entries with total_blocked=None ...")
- database.cursor.execute("SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'piefed', 'typecho', 'neko', 'zuiriben') AND nodeinfo_url IS NOT NULL AND total_blocks IS NULL ORDER BY last_blocked ASC, total_blocks DESC")
+ database.cursor.execute("SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'piefed', 'typecho', 'neko', 'zuiriben', 'gotosocial') AND nodeinfo_url IS NOT NULL AND total_blocks IS NULL ORDER BY last_blocked ASC, total_blocks DESC")
else:
# Re-check after "timeout" (aka. minimum interval)
logger.debug("Checking any federating software with possible blocklist ...")
- database.cursor.execute("SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'piefed', 'typecho', 'neko', 'zuiriben') AND nodeinfo_url IS NOT NULL ORDER BY last_blocked ASC, total_blocks DESC")
+ database.cursor.execute("SELECT domain, software, origin, nodeinfo_url FROM instances WHERE software IN ('pleroma', 'mastodon', 'lemmy', 'friendica', 'misskey', 'piefed', 'typecho', 'neko', 'zuiriben', 'gotosocial') AND nodeinfo_url IS NOT NULL ORDER BY last_blocked ASC, total_blocks DESC")
# Load all rows
rows = [dict(row) for row in database.cursor.fetchall()]
elif row["software"] == "friendica":
blocking = friendica.fetch_blocks(row["domain"])
logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"])
+ elif row["software"] == "gotosocial":
+ blocking = gotosocial.fetch_blocks(row["domain"])
+ logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"])
elif row["software"] == "misskey":
blocking = misskey.fetch_blocks(row["domain"])
logger.debug("row[domain]='%s' returned %d entries,row[software]='%s'", row["domain"], len(blocking), row["software"])
elif row["software"] == "friendica":
logger.debug("Fetching blocks from row[domain]='%s',row[software]='%s' ...", row["domain"], row["software"])
blocking = friendica.fetch_blocks(row["domain"])
+ elif row["software"] == "gotosocial":
+ logger.debug("Fetching blocks from row[domain]='%s',row[software]='%s' ...", row["domain"], row["software"])
+ blocking = gotosocial.fetch_blocks(row["domain"])
elif row["software"] == "misskey":
logger.debug("Fetching blocks from row[domain]='%s',row[software]='%s' ...", row["domain"], row["software"])
blocking = misskey.fetch_blocks(row["domain"])
--- /dev/null
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import logging
+
+import bs4
+
+from fba.helpers import blacklist
+from fba.helpers import domain as domain_helper
+from fba.helpers import tidyup
+
+from fba.http import network
+
+from fba.models import instances
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+#logger.setLevel(logging.DEBUG)
+
+def fetch_blocks(domain: str) -> list:
+ logger.debug("domain='%s' - CALLED!", domain)
+ domain_helper.raise_on(domain)
+
+ if blacklist.is_blacklisted(domain):
+ raise RuntimeError(f"domain='{domain}' is blacklisted but function was invoked")
+ elif not instances.is_registered(domain):
+ raise RuntimeError(f"domain='{domain}' is not registered but function was invoked")
+
+ blocklist = []
+ block_tag = None
+
+ try:
+ logger.debug("Fetching GotoSocial blocks from domain='%s' ...", domain)
+ raw = network.get_generic(
+ domain,
+ "/about/suspended"
+ ).text
+ logger.debug("Parsing %d Bytes ...", len(raw))
+
+ doc = bs4.BeautifulSoup(raw, features="html.parser")
+ logger.debug("doc[]='%s'", type(doc))
+
+ block_tag = doc.find("section")
+ logger.debug("block_tag[%s]='%s'", type(block_tag), block_tag)
+ except network.exceptions as exception:
+ logger.warning("Exception '%s' during fetching instances from domain='%s'", type(exception), domain)
+ instances.set_last_error(domain, exception)
+
+ logger.debug("Returning empty list ... - EXIT!")
+ return []
+
+ logger.debug("block_tag[%s]='%s'", type(block_tag), block_tag)
+ if block_tag is None:
+ logger.debug("Instance has no block list: domain='%s' - EXIT!", domain)
+ return []
+
+ # Init local variables
+ rows = ()
+
+ # Try to find table
+ table = block_tag.find("div", {"class": "domain-blocklist"})
+
+ logger.debug("table[]='%s'", type(table))
+ if table is None:
+ logger.warning("domain='%s' has no table tag - EXIT !", domain)
+ return []
+
+ # Find all rows in table
+ rows = table.find_all("div", {"class": "entry"})
+
+ logger.debug("Found rows[%s]()=%d", type(rows), len(rows))
+ for line in rows:
+ logger.debug("line[%s]='%s'", type(line), line)
+ if line.find({"class": "header"}):
+ logger.debug("line='%s' is a header - SKIPPED!", line)
+ continue
+
+ blocked = line.find("div", {"class": "domain"})
+ reason = line.find("div", {"class": "public_comment"})
+ if blocked is None:
+ logger.warning("domain='%s' has a blocklist entry with an empty blocked domain! line='%s' - SKIPPED!", domain, line)
+ continue
+
+ logger.debug("blocked='%s',reason='%s' - BEFORE!", blocked, reason)
+ blocked = tidyup.domain(blocked.text)
+ reason = tidyup.reason(reason.text) if reason is not None else None
+ logger.debug("blocked='%s',reason='%s' - AFTER!", blocked, reason)
+
+ if blocked in [None, ""]:
+ logger.warning("line[]='%s' returned empty blocked domain - SKIPPED!", type(line))
+ continue
+ elif not domain_helper.is_wanted(blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
+ continue
+
+ logger.debug("Appending blocked='%s',reason='%s'", blocked, reason)
+ blocklist.append({
+ "blocker" : domain,
+ "blocked" : blocked,
+ "reason" : reason,
+ "block_level": "rejected",
+ })
+
+ logger.debug("blocklist()=%d - EXIT!", len(blocklist))
+ return blocklist