]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Tue, 11 Jul 2023 04:42:34 +0000 (06:42 +0200)
committerRoland Häder <roland@mxchange.org>
Tue, 11 Jul 2023 04:42:34 +0000 (06:42 +0200)
- blacklisted hexbear.net as their JavaScript contains Shell commands + broken
  JSON inside that script
- added parsing JSON from JavaScript starting with 'isoData' (encapsulated to
  function parse_script())

fba/helpers/blacklist.py
fba/networks/lemmy.py

index 18fda14e28c284443a7fe5fae992413f4004005d..12c1655ca4075e1e6a30974cdafa81fed42bf312 100644 (file)
@@ -42,6 +42,8 @@ blacklist = [
     "netlify.app",
     # block flooder
     "everyoneattack.com",
+    # CSRF
+    "hexbear.net", # See script in /instances
 ]
 
 def is_blacklisted(domain: str) -> bool:
index fc4def05bfdbea9af46b5e55032ff1f2f9b49bfd..be1b12de7d3af04af6c7ee33e777b98ef9c6fd39 100644 (file)
@@ -14,6 +14,7 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+import json
 import logging
 
 import bs4
@@ -145,7 +146,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
 
             headers = doc.findAll("div", {"class": "home-instances container-lg"})
             found = None
-            logger.debug("Search in %d header(s) ...", len(headers))
+            logger.debug("Checking %d header(s) ...", len(headers))
             for header in headers:
                 logger.debug("header[]='%s'", type(header))
                 content = header.contents[0]
@@ -164,8 +165,11 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
 
             logger.debug("found[]='%s'", type(found))
             if found is None:
-                logger.debug("domain='%s' is not blocking any instances - EXIT!", domain)
-                return blocklist
+                logger.info("domain='%s' has no HTML blocklist, checking scripts ...", domain)
+                blocking = parse_script(doc, "blocked")
+
+                logger.debug("blocking()=%d - EXIT!", len(blocking))
+                return blocking
 
             blocking = found.find_next(["ul","table"]).findAll("a")
             logger.debug("Found %d blocked instance(s) ...", len(blocking))
@@ -245,7 +249,12 @@ def fetch_instances(domain: str, origin: str) -> list:
                     logger.debug("Appending peer='%s' ...", peer)
                     peers.append(peer)
 
-        logger.debug("Marking domain='%s' as successfully handled ...", domain)
+            logger.debug("peers()=%d", len(peers))
+            if len(peers) == 0:
+                logger.debug("Found no peers for domain='%s', trying script tag ...", domain)
+                peers = parse_script(doc)
+
+        logger.debug("Marking domain='%s' as successfully handled, peers()=%d ...", domain, len(peers))
         instances.set_success(domain)
 
     except network.exceptions as exception:
@@ -254,3 +263,87 @@ def fetch_instances(domain: str, origin: str) -> list:
 
     logger.debug("peers()=%d - EXIT!", len(peers))
     return peers
+
+def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
+    logger.debug("doc[]='%s',only='%s' - CALLED!")
+    if not isinstance(doc, bs4.BeautifulSoup):
+        raise ValueError(f"Parameter doc[]='{type(only)}' is not of type 'bs4.BeautifulSoup'")
+    elif not isinstance(only, str) and only != None:
+        raise ValueError(f"Parameter only[]='{type(only)}' is not of type 'str'")
+    elif isinstance(only, str) and only == "":
+        raise ValueError("Parameter 'only' is empty")
+
+    scripts = doc.find_all("script")
+    peers = list()
+
+    logger.debug("scripts()=%d", len(scripts))
+    for script in scripts:
+        logger.debug("script[%s].contents()=%d", type(script), len(script.contents))
+        if len(script.contents) == 0:
+            logger.debug("script has no contents - SKIPPED!")
+            continue
+        elif not script.contents[0].startswith("window.isoData"):
+            logger.debug("script.contents[0]='%s' does not start with window.isoData - SKIPPED!", script.contents[0])
+            continue
+
+        logger.debug("script.contents[0][]='%s'", type(script.contents[0]))
+
+        isoData = script.contents[0].split("=")[1].strip().replace(":undefined", ":\"undefined\"")
+        logger.debug("isoData[%s]='%s'", type(isoData), isoData)
+
+        parsed = None
+        try:
+            parsed = json.loads(isoData)
+        except json.decoder.JSONDecodeError as exception:
+            logger.warning("Exception '%s' during parsing %d Bytes: '%s'", type(exception), len(isoData), str(exception))
+            return list()
+
+        logger.debug("parsed[%s]()=%d", type(parsed), len(parsed))
+
+        if "routeData" not in parsed:
+            logger.warning("parsed[%s]()=%d does not contain element 'routeData'", type(parsed), len(parsed))
+            continue
+        elif "federatedInstancesResponse" not in parsed["routeData"]:
+            logger.warning("parsed[routeData][%s]()=%d does not contain element 'federatedInstancesResponse'", type(parsed["routeData"]), len(parsed["routeData"]))
+            continue
+        elif "data" not in parsed["routeData"]["federatedInstancesResponse"]:
+            logger.warning("parsed[routeData][federatedInstancesResponse][%s]()=%d does not contain element 'data'", type(parsed["routeData"]["federatedInstancesResponse"]), len(parsed["routeData"]["federatedInstancesResponse"]))
+            continue
+        elif "federated_instances" not in parsed["routeData"]["federatedInstancesResponse"]["data"]:
+            logger.warning("parsed[routeData][federatedInstancesResponse][data][%s]()=%d does not contain element 'data'", type(parsed["routeData"]["federatedInstancesResponse"]["data"]), len(parsed["routeData"]["federatedInstancesResponse"]["data"]))
+            continue
+
+        data = parsed["routeData"]["federatedInstancesResponse"]["data"]["federated_instances"]
+        logger.debug("Checking %d data elements ...", len(data))
+        for element in data:
+            logger.debug("element='%s'", element)
+            if isinstance(only, str) and only != element:
+                logger.debug("Skipping unwanted element='%s',only='%s'", element, only)
+                continue
+
+            logger.debug("Checking data[%s]()=%d row(s) ...", element, len(data[element]))
+            for row in data[element]:
+                logger.debug("row[]='%s'", type(row))
+                if "domain" not in row:
+                    logger.warning("row()=%d has no element 'domain' - SKIPPED!", len(row))
+                    continue
+
+                logger.debug("row[domain]='%s' - BEFORE!", row["domain"])
+                peer = tidyup.domain(row["domain"])
+                logger.debug("peer='%s' - AFTER!", peer)
+
+                if peer == "":
+                    logger.debug("peer is empty - SKIPPED!")
+                    continue
+                elif not utils.is_domain_wanted(peer):
+                    logger.debug("peer='%s' is not wanted - SKIPPED!", peer)
+                    continue
+                elif peer in peers:
+                    logger.debug("peer='%s' already added - SKIPPED!", peer)
+                    continue
+
+                logger.debug("Appending peer='%s' ...", peer)
+                peers.append(peer)
+
+    logger.debug("peers()=%d - EXIT!", len(peers))
+    return peers