]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Mon, 13 Jan 2025 23:40:06 +0000 (00:40 +0100)
committerRoland Häder <roland@mxchange.org>
Mon, 13 Jan 2025 23:40:06 +0000 (00:40 +0100)
- avoided dangerous (=mutable) argument to functions/methods (thanks to pylint)
- reduced invocation count of find_all("foo") by using local variable
- added more checks in "quarantined" branch

fba/http/network.py
fba/networks/friendica.py
fba/networks/pleroma.py

index e1117fcc4db1c90406985ac41e9422db6eefa58d..829e0f5b38e794a48414f97a0cb7d47f5a8383b4 100644 (file)
@@ -63,7 +63,7 @@ logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 #logger.setLevel(logging.DEBUG)
 
-def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) -> dict:
+def post_json_api(domain: str, path: str, data: str = "", headers: dict = None) -> dict:
     logger.debug("domain='%s',path='%s',data='%s',headers()=%d - CALLED!", domain, path, data, len(headers))
     domain_helper.raise_on(domain)
 
@@ -77,8 +77,8 @@ def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) ->
         raise ValueError(f"path='{path}' does not start with / but should")
     elif not isinstance(data, str):
         raise ValueError(f"data[]='{type(data)}' is not of type 'str'")
-    elif not isinstance(headers, dict):
-        raise ValueError(f"headers[]='{type(headers)}' is not of type 'list'")
+    elif headers is not None and not isinstance(headers, dict):
+        raise ValueError(f"headers[]='{type(headers)}' is not of type 'dict'")
 
     json_reply = {
         "status_code": 200,
@@ -190,7 +190,7 @@ def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict:
 
     try:
         logger.debug("Sending GET to domain='%s',path='%s',timeout(%d)='%s'", domain, path, len(timeout), timeout)
-        response = fetch_response(domain, path, {**api_headers, **headers}, timeout)
+        response = _fetch_response(domain, path, {**api_headers, **headers}, timeout)
     except exceptions as exception:
         logger.debug("Fetching path='%s' from domain='%s' failed. exception[%s]='%s'", path, domain, type(exception), str(exception))
         json_reply["status_code"]   = 999
@@ -278,7 +278,7 @@ def send_bot_post(domain: str, blocklist: list) -> None:
     logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
     return response.ok and response.status_code == 200 and response.text.strip() != ""
 
-def fetch_response(domain: str, path: str, headers: dict, timeout: tuple, allow_redirects: bool = False) -> requests.models.Response:
+def _fetch_response(domain: str, path: str, headers: dict, timeout: tuple, allow_redirects: bool = False) -> requests.models.Response:
     logger.debug("domain='%s',path='%s',headers()=%d,timeout='%s',allow_redirects='%s' - CALLED!", domain, path, len(headers), timeout, allow_redirects)
     domain_helper.raise_on(domain)
 
@@ -349,7 +349,7 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon
     logger.debug("components[%s]='%s'", type(components), components)
     if components.query != "":
         logger.debug("Fetching path='%s?%s' from netloc='%s' ...", components.path, components.query, components.netloc)
-        response = fetch_response(
+        response = _fetch_response(
             components.netloc.split(":")[0],
             f"{components.path}?{components.query}",
             headers=headers,
@@ -357,7 +357,7 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon
         )
     else:
         logger.debug("Fetching path='%s' from netloc='%s' ...", components.path, components.netloc)
-        response = fetch_response(
+        response = _fetch_response(
             components.netloc.split(":")[0],
             components.path if isinstance(components.path, str) and components.path != '' else '/',
             headers=headers,
@@ -367,7 +367,7 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon
     logger.debug("response[]='%s' - EXIT!", type(response))
     return response
 
-def fetch_json_rows(hostname: str, path: str, headers: dict = {}, rows_key: str = None):
+def fetch_json_rows(hostname: str, path: str, headers: dict = None, rows_key: str = None):
     logger.debug("hostname='%s',path='%s',headers()=%d,rows_key='%s' - CALLED!", hostname, path, len(headers), rows_key)
 
     if not isinstance(hostname, str):
@@ -382,7 +382,7 @@ def fetch_json_rows(hostname: str, path: str, headers: dict = {}, rows_key: str
         raise ValueError("Parameter 'path' is an empty string")
     elif not path.startswith("/"):
         raise ValueError(f"path='{path}' does not start with a slash")
-    elif not isinstance(headers, dict):
+    elif headers is not None and not isinstance(headers, dict):
         raise ValueError(f"headers[]='{type(headers)}' is not of type 'dict'")
     elif not isinstance(rows_key, str) and rows_key is not None:
         raise ValueError(f"rows_key[]='{type(rows_key)}' is not of type 'str'")
@@ -434,7 +434,7 @@ def get_generic(domain: str, path: str, allow_redirects: bool = False) -> reques
         raise ValueError(f"allow_redirects[]='{type(allow_redirects)}' is not of type 'bool'")
 
     logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
-    response = fetch_response(
+    response = _fetch_response(
         domain,
         path,
         headers=web_headers,
index d057a27e20e1a0be8f0db8d3ed54e43de5a8c665..fcf9e5c1d14e2fe99b0fd906665449b07234f26c 100644 (file)
@@ -67,22 +67,28 @@ def fetch_blocks(domain: str) -> list:
         logger.debug("Instance has no block list: domain='%s' - EXIT!", domain)
         return []
 
+    # Init local variables
+    rows = ()
+
+    # Try to find table
     table = block_tag.find("table")
 
     logger.debug("table[]='%s'", type(table))
     if table is None:
         logger.warning("domain='%s' has no table tag - EXIT !", domain)
         return []
-    elif table.find("tbody"):
-        rows = table.find("tbody").find_all("tr")
-    else:
-        rows = table.find_all("tr")
 
-    logger.debug("Found rows()=%d", len(rows))
+    # Find all rows in table
+    rows = table.find_all("tr")
+
+    logger.debug("Found rows[%s]()=%d", type(rows), len(rows))
     for line in rows:
         logger.debug("line[%s]='%s'", type(line), line)
-        blocked = line.find_all("td")[0].text
-        reason  = line.find_all("td")[1].text
+        tds = line.find_all("td")
+
+        logger.debug("tds[%s]()=%d", type(tds), len(tds))
+        blocked = tds[0].text.strip()
+        reason  = tds[1].text.strip()
 
         logger.debug("blocked='%s',reason='%s' - BEFORE!", blocked, reason)
         blocked = tidyup.domain(blocked) if blocked != "" else None
index fc92f888e1921ad2d30ae80ff18aadd7676d5ab8..5ca1a93dfee014a84b75b0b787c39bf28b26766b 100644 (file)
@@ -94,7 +94,7 @@ def fetch_blocks(domain: str) -> list:
         logger.warning("rows()=%d does not have key 'metadata', domain='%s' - EXIT!", len(rows), domain)
         return []
     elif "federation" not in rows["metadata"]:
-        logger.warning("rows()=%d does not have key 'federation', domain='%s' - EXIT!", len(rows["metadata"]), domain)
+        logger.warning("rows[metadata]()=%d does not have key 'federation', domain='%s' - EXIT!", len(rows["metadata"]), domain)
         return []
 
     found = False
@@ -159,7 +159,13 @@ def fetch_blocks(domain: str) -> list:
             if blocked in [None, ""]:
                 logger.debug("blocked[%s]='%s' is empty after tidyup.domain(): domain='%s',block_level='%s'", type(blocked), blocked, domain, block_level)
                 continue
-            elif not domain_helper.is_wanted(blocked):
+            elif not domain_helper.is_tld_wanted(blocked):
+                logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
+                continue
+            elif validators.domain(blocked, rfc_2782=True) and blacklist.is_blacklisted(blocked):
+                logger.debug("blocked='%s' is blacklisted - SKIPPED!")
+                continue
+            elif blocked.find("*") == -1 and blocked.find("?") == -1 and not domain_helper.is_wanted(blocked):
                 logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
                 continue
 
@@ -367,8 +373,11 @@ def fetch_blocks_from_about(domain: str) -> dict:
             logger.debug("Found block_level='%s', importing domain blocks ...", block_level)
             for line in header.find_next("table").find_all("tr")[1:]:
                 logger.debug("line[]='%s'", type(line))
-                blocked = line.find_all("td")[0].text
-                reason  = line.find_all("td")[1].text
+                tds = line.find_all("td")
+
+                logger.debug("tds[%s]()=%d", type(tds) len(tds))
+                blocked = tds[0].text
+                reason  = tds[1].text
 
                 logger.debug("blocked='%s',reason='%s' - BEFORE!", blocked, reason)
                 blocked = tidyup.domain(blocked) if blocked != "" else None