From 1477b5c3b0e11f9a80b5e2dd17eb8e8531e2c3e2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Tue, 14 Jan 2025 00:40:06 +0100 Subject: [PATCH] Continued: - avoided dangerous (=mutable) argument to functions/methods (thanks to pylint) - reduced invocation count of find_all("foo") by using local variable - added more checks in "quarantined" branch --- fba/http/network.py | 20 ++++++++++---------- fba/networks/friendica.py | 20 +++++++++++++------- fba/networks/pleroma.py | 17 +++++++++++++---- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/fba/http/network.py b/fba/http/network.py index e1117fc..829e0f5 100644 --- a/fba/http/network.py +++ b/fba/http/network.py @@ -63,7 +63,7 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) #logger.setLevel(logging.DEBUG) -def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) -> dict: +def post_json_api(domain: str, path: str, data: str = "", headers: dict = None) -> dict: logger.debug("domain='%s',path='%s',data='%s',headers()=%d - CALLED!", domain, path, data, len(headers)) domain_helper.raise_on(domain) @@ -77,8 +77,8 @@ def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) -> raise ValueError(f"path='{path}' does not start with / but should") elif not isinstance(data, str): raise ValueError(f"data[]='{type(data)}' is not of type 'str'") - elif not isinstance(headers, dict): - raise ValueError(f"headers[]='{type(headers)}' is not of type 'list'") + elif headers is not None and not isinstance(headers, dict): + raise ValueError(f"headers[]='{type(headers)}' is not of type 'dict'") json_reply = { "status_code": 200, @@ -190,7 +190,7 @@ def get_json_api(domain: str, path: str, headers: dict, timeout: tuple) -> dict: try: logger.debug("Sending GET to domain='%s',path='%s',timeout(%d)='%s'", domain, path, len(timeout), timeout) - response = fetch_response(domain, path, {**api_headers, **headers}, timeout) + response = _fetch_response(domain, path, {**api_headers, **headers}, timeout) except exceptions as exception: logger.debug("Fetching path='%s' from domain='%s' failed. exception[%s]='%s'", path, domain, type(exception), str(exception)) json_reply["status_code"] = 999 @@ -278,7 +278,7 @@ def send_bot_post(domain: str, blocklist: list) -> None: logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text)) return response.ok and response.status_code == 200 and response.text.strip() != "" -def fetch_response(domain: str, path: str, headers: dict, timeout: tuple, allow_redirects: bool = False) -> requests.models.Response: +def _fetch_response(domain: str, path: str, headers: dict, timeout: tuple, allow_redirects: bool = False) -> requests.models.Response: logger.debug("domain='%s',path='%s',headers()=%d,timeout='%s',allow_redirects='%s' - CALLED!", domain, path, len(headers), timeout, allow_redirects) domain_helper.raise_on(domain) @@ -349,7 +349,7 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon logger.debug("components[%s]='%s'", type(components), components) if components.query != "": logger.debug("Fetching path='%s?%s' from netloc='%s' ...", components.path, components.query, components.netloc) - response = fetch_response( + response = _fetch_response( components.netloc.split(":")[0], f"{components.path}?{components.query}", headers=headers, @@ -357,7 +357,7 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon ) else: logger.debug("Fetching path='%s' from netloc='%s' ...", components.path, components.netloc) - response = fetch_response( + response = _fetch_response( components.netloc.split(":")[0], components.path if isinstance(components.path, str) and components.path != '' else '/', headers=headers, @@ -367,7 +367,7 @@ def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Respon logger.debug("response[]='%s' - EXIT!", type(response)) return response -def fetch_json_rows(hostname: str, path: str, headers: dict = {}, rows_key: str = None): +def fetch_json_rows(hostname: str, path: str, headers: dict = None, rows_key: str = None): logger.debug("hostname='%s',path='%s',headers()=%d,rows_key='%s' - CALLED!", hostname, path, len(headers), rows_key) if not isinstance(hostname, str): @@ -382,7 +382,7 @@ def fetch_json_rows(hostname: str, path: str, headers: dict = {}, rows_key: str raise ValueError("Parameter 'path' is an empty string") elif not path.startswith("/"): raise ValueError(f"path='{path}' does not start with a slash") - elif not isinstance(headers, dict): + elif headers is not None and not isinstance(headers, dict): raise ValueError(f"headers[]='{type(headers)}' is not of type 'dict'") elif not isinstance(rows_key, str) and rows_key is not None: raise ValueError(f"rows_key[]='{type(rows_key)}' is not of type 'str'") @@ -434,7 +434,7 @@ def get_generic(domain: str, path: str, allow_redirects: bool = False) -> reques raise ValueError(f"allow_redirects[]='{type(allow_redirects)}' is not of type 'bool'") logger.debug("Fetching path='%s' from domain='%s' ...", path, domain) - response = fetch_response( + response = _fetch_response( domain, path, headers=web_headers, diff --git a/fba/networks/friendica.py b/fba/networks/friendica.py index d057a27..fcf9e5c 100644 --- a/fba/networks/friendica.py +++ b/fba/networks/friendica.py @@ -67,22 +67,28 @@ def fetch_blocks(domain: str) -> list: logger.debug("Instance has no block list: domain='%s' - EXIT!", domain) return [] + # Init local variables + rows = () + + # Try to find table table = block_tag.find("table") logger.debug("table[]='%s'", type(table)) if table is None: logger.warning("domain='%s' has no table tag - EXIT !", domain) return [] - elif table.find("tbody"): - rows = table.find("tbody").find_all("tr") - else: - rows = table.find_all("tr") - logger.debug("Found rows()=%d", len(rows)) + # Find all rows in table + rows = table.find_all("tr") + + logger.debug("Found rows[%s]()=%d", type(rows), len(rows)) for line in rows: logger.debug("line[%s]='%s'", type(line), line) - blocked = line.find_all("td")[0].text - reason = line.find_all("td")[1].text + tds = line.find_all("td") + + logger.debug("tds[%s]()=%d", type(tds), len(tds)) + blocked = tds[0].text.strip() + reason = tds[1].text.strip() logger.debug("blocked='%s',reason='%s' - BEFORE!", blocked, reason) blocked = tidyup.domain(blocked) if blocked != "" else None diff --git a/fba/networks/pleroma.py b/fba/networks/pleroma.py index fc92f88..5ca1a93 100644 --- a/fba/networks/pleroma.py +++ b/fba/networks/pleroma.py @@ -94,7 +94,7 @@ def fetch_blocks(domain: str) -> list: logger.warning("rows()=%d does not have key 'metadata', domain='%s' - EXIT!", len(rows), domain) return [] elif "federation" not in rows["metadata"]: - logger.warning("rows()=%d does not have key 'federation', domain='%s' - EXIT!", len(rows["metadata"]), domain) + logger.warning("rows[metadata]()=%d does not have key 'federation', domain='%s' - EXIT!", len(rows["metadata"]), domain) return [] found = False @@ -159,7 +159,13 @@ def fetch_blocks(domain: str) -> list: if blocked in [None, ""]: logger.debug("blocked[%s]='%s' is empty after tidyup.domain(): domain='%s',block_level='%s'", type(blocked), blocked, domain, block_level) continue - elif not domain_helper.is_wanted(blocked): + elif not domain_helper.is_tld_wanted(blocked): + logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) + continue + elif validators.domain(blocked, rfc_2782=True) and blacklist.is_blacklisted(blocked): + logger.debug("blocked='%s' is blacklisted - SKIPPED!") + continue + elif blocked.find("*") == -1 and blocked.find("?") == -1 and not domain_helper.is_wanted(blocked): logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked) continue @@ -367,8 +373,11 @@ def fetch_blocks_from_about(domain: str) -> dict: logger.debug("Found block_level='%s', importing domain blocks ...", block_level) for line in header.find_next("table").find_all("tr")[1:]: logger.debug("line[]='%s'", type(line)) - blocked = line.find_all("td")[0].text - reason = line.find_all("td")[1].text + tds = line.find_all("td") + + logger.debug("tds[%s]()=%d", type(tds) len(tds)) + blocked = tds[0].text + reason = tds[1].text logger.debug("blocked='%s',reason='%s' - BEFORE!", blocked, reason) blocked = tidyup.domain(blocked) if blocked != "" else None -- 2.39.5