]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Sun, 12 Jan 2025 14:08:41 +0000 (15:08 +0100)
committerRoland Häder <roland@mxchange.org>
Sun, 12 Jan 2025 14:08:41 +0000 (15:08 +0100)
- need to check if 'row' is type of 'dict'
- logger messages improved or added

fba/commands.py
fba/models/instances.py
fba/networks/lemmy.py

index a9df12d6df0c16b14786381a1419f132b3054ec9..ee643276dc5723b6eee3fa7969db646a5a91e86d 100644 (file)
@@ -326,6 +326,7 @@ def fetch_blocks(args: argparse.Namespace) -> int:
         instances.set_has_obfuscation(row["domain"], False)
 
         # c.s isn't part of oliphant's "hidden" blocklists
+        logger.debug("row[domain]='%s',row[software]='%s'", row["domain"], row["software"])
         if row["domain"] == "chaos.social" or software_helper.is_relay(row["software"]) or blocklists.has(row["domain"]):
             logger.debug("Skipping row[domain]='%s', run ./fba.py fetch_cs, fetch_oliphant, fetch_csv instead!", row["domain"])
             continue
@@ -365,13 +366,13 @@ def fetch_blocks(args: argparse.Namespace) -> int:
             logger.debug("blocked='%s',block_level='%s',reason='%s'", block["blocked"], block["block_level"], block["reason"])
 
             if block["block_level"] in [None, ""]:
-                logger.warning("block_level is empty, row[domain]='%s',blocked='%s'", block["blocker"], block["blocked"])
+                logger.warning("block_level='%s' is empty, row[domain]='%s',block[blocked]='%s'", block_level, block["blocker"], block["blocked"])
                 continue
 
-            logger.debug("blocked='%s',reason='%s' - BEFORE!", block["blocked"], block["reason"])
+            logger.debug("block[blocked]='%s',block[reason]='%s' - BEFORE!", block["blocked"], block["reason"])
             block["blocked"] = tidyup.domain(block["blocked"])
             block["reason"]  = tidyup.reason(block["reason"]) if block["reason"] is not None and block["reason"] != "" else None
-            logger.debug("blocked='%s',reason='%s' - AFTER!", block["blocked"], block["reason"])
+            logger.debug("block[blocked]='%s',block[reason]='%s' - AFTER!", block["blocked"], block["reason"])
 
             if block["blocked"] in [None, ""]:
                 logger.warning("block[blocked]='%s' is empty, row[domain]='%s'", block["blocked"], row["domain"])
@@ -1386,7 +1387,7 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
         logger.debug("Setting has_obfuscation=False for row[domain]='%s' ...", row["domain"])
         instances.set_has_obfuscation(row["domain"], False)
 
-        logger.debug("blocking()=%d", len(blocking))
+        logger.debug("blocking()=%d - BEFORE!", len(blocking))
         if len(blocking) == 0:
             logger.debug("Empty blocking list, trying individual fetch_blocks() for row[domain]='%s',row[software]='%s' ...", row["domain"], row["software"])
             if row["software"] == "pleroma":
@@ -1407,8 +1408,10 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
             else:
                 logger.warning("Unknown software: row[domain]='%s',row[software]='%s'", row["domain"], row["software"])
 
+        logger.debug("blocking()=%d - AFTER!", len(blocking))
+
         # c.s isn't part of oliphant's "hidden" blocklists
-        logger.debug("row[domain]='%s'", row["domain"])
+        logger.debug("row[domain]='%s',row[software]='%s'", row["domain"], row["software"])
         if row["domain"] != "chaos.social" and row["software"] is not None and not software_helper.is_relay(row["software"]) and not blocklists.has(row["domain"]):
             logger.debug("Invoking instances.set_total_blocks(%s,%d) ...", row["domain"], len(blocking))
             instances.set_last_blocked(row["domain"])
index 43b57bb52a2b74b9b4ec294a7497ef06f206b7ae..2f39d860590a6e7501508fa3b68232894c41ed35 100644 (file)
@@ -528,15 +528,15 @@ def set_detection_mode(domain: str, mode: str) -> None:
     _set_data("detection_mode", domain, mode)
     logger.debug("EXIT!")
 
-def set_has_obfuscation(domain: str, status: bool) -> None:
-    logger.debug("domain='%s',status='%s' - CALLED!", domain, status)
+def set_has_obfuscation(domain: str, has_obfuscation: bool) -> None:
+    logger.debug("domain='%s',has_obfuscation='%s' - CALLED!", domain, has_obfuscation)
     domain_helper.raise_on(domain)
 
-    if not isinstance(status, bool):
-        raise ValueError(f"Parameter status[]='{type(status)}' is not of type 'bool'")
+    if not isinstance(has_obfuscation, bool):
+        raise ValueError(f"Parameter has_obfuscation[]='{type(has_obfuscation)}' is not of type 'bool'")
 
     # Set timestamp
-    _set_data("has_obfuscation", domain, status)
+    _set_data("has_obfuscation", domain, has_obfuscation)
     logger.debug("EXIT!")
 
 def set_original_software(domain: str, software: str) -> None:
@@ -587,21 +587,17 @@ def valid(value: str, column: str) -> bool:
     logger.debug("is_valid='%s' - EXIT!", is_valid)
     return is_valid
 
-def delete(domain: str, column: str = "domain") -> None:
-    logger.debug("domain='%s, column='%s'' - CALLED!", domain, column)
+def delete(search: str, column: str = "domain") -> None:
+    logger.debug("search='%s, column='%s'' - CALLED!", search, column)
 
-    if not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='{type(domain)}' is not of type 'str'")
-    elif domain == "":
-        raise ValueError("Parameter 'domain' is empty")
-    elif not isinstance(column, str):
+    if not isinstance(column, str):
         raise ValueError(f"Parameter column[]='{type(column)}' is not of type 'str'")
     elif column == "":
         raise ValueError("Parameter 'column' is empty")
     elif column in ["domain", "origin"]:
-        domain_helper.raise_on(domain)
+        domain_helper.raise_on(search)
 
-    database.cursor.execute(f"DELETE FROM instances WHERE {column} = ? LIMIT 1", [domain])
+    database.cursor.execute(f"DELETE FROM instances WHERE {column} = ?", [search])
 
     logger.debug("Invoking commit() ...")
     database.connection.commit()
index 35ba0c3a4fe7df4f4199bfcef8f85385a768d543..4b2929ec0ff50409444dce2e2bc0b29a2b88a3c4 100644 (file)
@@ -318,7 +318,7 @@ def fetch_instances(domain: str, origin: str) -> list:
     return peers
 
 def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
-    logger.debug("doc[]='%s',only='%s' - CALLED!")
+    logger.debug("doc[]='%s',only='%s' - CALLED!", type(doc), only)
 
     if not isinstance(doc, bs4.BeautifulSoup):
         raise ValueError(f"Parameter doc[]='{type(only)}' is not of type 'bs4.BeautifulSoup'")
@@ -347,6 +347,7 @@ def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
 
         parsed = None
         try:
+            logger.debug("Parsing %d Bytes long JSON string ...", len(iso_data))
             parsed = json.loads(iso_data)
         except json.decoder.JSONDecodeError as exception:
             logger.warning("Exception '%s' during parsing %d Bytes: '%s' - EXIT!", type(exception), len(iso_data), str(exception))
@@ -370,7 +371,7 @@ def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
         data = parsed["routeData"]["federatedInstancesResponse"]["data"]["federated_instances"]
         logger.debug("Checking %d data elements ...", len(data))
         for element in data:
-            logger.debug("element[%s]='%s'", type(element), element)
+            logger.debug("element[%s]='%s',only='%s'", type(element), element, only)
             if isinstance(only, str) and only != element:
                 logger.debug("Skipping unwanted element='%s',only='%s'", element, only)
                 continue
@@ -378,7 +379,10 @@ def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
             logger.debug("Checking data[%s]()=%d row(s) ...", element, len(data[element]))
             for row in data[element]:
                 logger.debug("row[]='%s'", type(row))
-                if "domain" not in row:
+                if not isinstance(row, dict):
+                    logger.warning("row[]='%s' is not of expected type 'dict' - SKIPPED!", type(row))
+                    continue
+                elif "domain" not in row:
                     logger.warning("row()=%d has no element 'domain' - SKIPPED!", len(row))
                     continue
                 elif row["domain"] in [None, ""]: