From 98bcfa162822e862da7713742a67776a7ad8afbf Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Tue, 11 Jul 2023 06:42:34 +0200 Subject: [PATCH] Continued: - blacklisted hexbear.net as their JavaScript contains Shell commands + broken JSON inside that script - added parsing JSON from JavaScript starting with 'isoData' (encapsulated to function parse_script()) --- fba/helpers/blacklist.py | 2 + fba/networks/lemmy.py | 101 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 99 insertions(+), 4 deletions(-) diff --git a/fba/helpers/blacklist.py b/fba/helpers/blacklist.py index 18fda14..12c1655 100644 --- a/fba/helpers/blacklist.py +++ b/fba/helpers/blacklist.py @@ -42,6 +42,8 @@ blacklist = [ "netlify.app", # block flooder "everyoneattack.com", + # CSRF + "hexbear.net", # See script in /instances ] def is_blacklisted(domain: str) -> bool: diff --git a/fba/networks/lemmy.py b/fba/networks/lemmy.py index fc4def0..be1b12d 100644 --- a/fba/networks/lemmy.py +++ b/fba/networks/lemmy.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import json import logging import bs4 @@ -145,7 +146,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: headers = doc.findAll("div", {"class": "home-instances container-lg"}) found = None - logger.debug("Search in %d header(s) ...", len(headers)) + logger.debug("Checking %d header(s) ...", len(headers)) for header in headers: logger.debug("header[]='%s'", type(header)) content = header.contents[0] @@ -164,8 +165,11 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list: logger.debug("found[]='%s'", type(found)) if found is None: - logger.debug("domain='%s' is not blocking any instances - EXIT!", domain) - return blocklist + logger.info("domain='%s' has no HTML blocklist, checking scripts ...", domain) + blocking = parse_script(doc, "blocked") + + logger.debug("blocking()=%d - EXIT!", len(blocking)) + return blocking blocking = found.find_next(["ul","table"]).findAll("a") logger.debug("Found %d blocked instance(s) ...", len(blocking)) @@ -245,7 +249,12 @@ def fetch_instances(domain: str, origin: str) -> list: logger.debug("Appending peer='%s' ...", peer) peers.append(peer) - logger.debug("Marking domain='%s' as successfully handled ...", domain) + logger.debug("peers()=%d", len(peers)) + if len(peers) == 0: + logger.debug("Found no peers for domain='%s', trying script tag ...", domain) + peers = parse_script(doc) + + logger.debug("Marking domain='%s' as successfully handled, peers()=%d ...", domain, len(peers)) instances.set_success(domain) except network.exceptions as exception: @@ -254,3 +263,87 @@ def fetch_instances(domain: str, origin: str) -> list: logger.debug("peers()=%d - EXIT!", len(peers)) return peers + +def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list: + logger.debug("doc[]='%s',only='%s' - CALLED!") + if not isinstance(doc, bs4.BeautifulSoup): + raise ValueError(f"Parameter doc[]='{type(only)}' is not of type 'bs4.BeautifulSoup'") + elif not isinstance(only, str) and only != None: + raise ValueError(f"Parameter only[]='{type(only)}' is not of type 'str'") + elif isinstance(only, str) and only == "": + raise ValueError("Parameter 'only' is empty") + + scripts = doc.find_all("script") + peers = list() + + logger.debug("scripts()=%d", len(scripts)) + for script in scripts: + logger.debug("script[%s].contents()=%d", type(script), len(script.contents)) + if len(script.contents) == 0: + logger.debug("script has no contents - SKIPPED!") + continue + elif not script.contents[0].startswith("window.isoData"): + logger.debug("script.contents[0]='%s' does not start with window.isoData - SKIPPED!", script.contents[0]) + continue + + logger.debug("script.contents[0][]='%s'", type(script.contents[0])) + + isoData = script.contents[0].split("=")[1].strip().replace(":undefined", ":\"undefined\"") + logger.debug("isoData[%s]='%s'", type(isoData), isoData) + + parsed = None + try: + parsed = json.loads(isoData) + except json.decoder.JSONDecodeError as exception: + logger.warning("Exception '%s' during parsing %d Bytes: '%s'", type(exception), len(isoData), str(exception)) + return list() + + logger.debug("parsed[%s]()=%d", type(parsed), len(parsed)) + + if "routeData" not in parsed: + logger.warning("parsed[%s]()=%d does not contain element 'routeData'", type(parsed), len(parsed)) + continue + elif "federatedInstancesResponse" not in parsed["routeData"]: + logger.warning("parsed[routeData][%s]()=%d does not contain element 'federatedInstancesResponse'", type(parsed["routeData"]), len(parsed["routeData"])) + continue + elif "data" not in parsed["routeData"]["federatedInstancesResponse"]: + logger.warning("parsed[routeData][federatedInstancesResponse][%s]()=%d does not contain element 'data'", type(parsed["routeData"]["federatedInstancesResponse"]), len(parsed["routeData"]["federatedInstancesResponse"])) + continue + elif "federated_instances" not in parsed["routeData"]["federatedInstancesResponse"]["data"]: + logger.warning("parsed[routeData][federatedInstancesResponse][data][%s]()=%d does not contain element 'data'", type(parsed["routeData"]["federatedInstancesResponse"]["data"]), len(parsed["routeData"]["federatedInstancesResponse"]["data"])) + continue + + data = parsed["routeData"]["federatedInstancesResponse"]["data"]["federated_instances"] + logger.debug("Checking %d data elements ...", len(data)) + for element in data: + logger.debug("element='%s'", element) + if isinstance(only, str) and only != element: + logger.debug("Skipping unwanted element='%s',only='%s'", element, only) + continue + + logger.debug("Checking data[%s]()=%d row(s) ...", element, len(data[element])) + for row in data[element]: + logger.debug("row[]='%s'", type(row)) + if "domain" not in row: + logger.warning("row()=%d has no element 'domain' - SKIPPED!", len(row)) + continue + + logger.debug("row[domain]='%s' - BEFORE!", row["domain"]) + peer = tidyup.domain(row["domain"]) + logger.debug("peer='%s' - AFTER!", peer) + + if peer == "": + logger.debug("peer is empty - SKIPPED!") + continue + elif not utils.is_domain_wanted(peer): + logger.debug("peer='%s' is not wanted - SKIPPED!", peer) + continue + elif peer in peers: + logger.debug("peer='%s' already added - SKIPPED!", peer) + continue + + logger.debug("Appending peer='%s' ...", peer) + peers.append(peer) + + logger.debug("peers()=%d - EXIT!", len(peers)) + return peers -- 2.39.5