]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Tue, 11 Jul 2023 11:48:05 +0000 (13:48 +0200)
committerRoland Häder <roland@mxchange.org>
Tue, 11 Jul 2023 11:48:31 +0000 (13:48 +0200)
- moved utils.alias_block_level() to blocks model
- fixed some pylint issues

daemon.py
fba/commands.py
fba/helpers/domain.py
fba/models/blocks.py
fba/networks/lemmy.py
fba/networks/pleroma.py
fba/utils.py

index bef82a13871446ff5fc43ea5d6acb8d76334142b..4e22ce97602b3c8ba38bdc716da693cd47d2c5f1 100755 (executable)
--- a/daemon.py
+++ b/daemon.py
@@ -375,7 +375,7 @@ def top(request: Request, mode: str, value: str, amount: int = config.get("api_l
     })
 
 @router.get(config.get("base_url") + "/infos")
-def rss(request: Request, domain: str):
+def infos(request: Request, domain: str):
     if domain is None:
         raise HTTPException(status_code=400, detail="Invalid request, parameter 'domain' missing")
 
index ffb4ae736ca6d7009c3a4c94efe0c35ca3e8969a..a58440b77dff126cf6b4978f13530c699780877d 100644 (file)
@@ -416,7 +416,7 @@ def fetch_blocks(args: argparse.Namespace) -> int:
                 logger.debug("Hash wasn't found, adding: blocked='%s',blocker='%s'", block["blocked"], blocker)
                 federation.fetch_instances(block["blocked"], blocker, None, inspect.currentframe().f_code.co_name)
 
-            block["block_level"] = utils.alias_block_level(block["block_level"])
+            block["block_level"] = blocks.alias_block_level(block["block_level"])
 
             if processing.block(blocker, block["blocked"], block["reason"], block["block_level"]) and block["block_level"] == "reject" and config.get("bot_enabled"):
                 logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", block["blocked"], block["block_level"], blocker)
@@ -1045,9 +1045,9 @@ def fetch_oliphant(args: argparse.Namespace) -> int:
                 continue
 
             if "#severity" in row:
-                severity = utils.alias_block_level(row["#severity"])
+                severity = blocks.alias_block_level(row["#severity"])
             elif "severity" in row:
-                severity = utils.alias_block_level(row["severity"])
+                severity = blocks.alias_block_level(row["severity"])
             else:
                 logger.debug("row='%s' does not contain severity column", row)
                 continue
@@ -1492,7 +1492,7 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
                     logger.debug("blocked='%s' is already blocked by domain='%s' - SKIPPED!", blocked, row["domain"])
                     continue
 
-                block["block_level"] = utils.alias_block_level(block["block_level"])
+                block["block_level"] = blocks.alias_block_level(block["block_level"])
 
                 logger.info("blocked='%s' has been deobfuscated to blocked='%s', adding ...", block["blocked"], blocked)
                 if processing.block(row["domain"], blocked, block["reason"], block["block_level"]) and block["block_level"] == "reject" and config.get("bot_enabled"):
@@ -1614,7 +1614,7 @@ def update_nodeinfo(args: argparse.Namespace) -> int:
             software = federation.determine_software(row["domain"])
 
             logger.debug("Determined software='%s'", software)
-            if (software != row["software"] and software is not None) or args.force == True:
+            if (software != row["software"] and software is not None) or args.force is True:
                 logger.warning("Software type for row[domain]='%s' has changed from '%s' to '%s'!", row["domain"], row["software"], software)
                 instances.set_software(row["domain"], software)
 
index 5328e3f37eeddc8a25043b390249704a7486de82..1830e3f41243a0926c6dd9c4485d8f074d49d41e 100644 (file)
@@ -55,7 +55,7 @@ def is_in_url(domain: str, url: str) -> bool:
     punycode = domain.encode("idna").decode("utf-8")
 
     logger.debug("components[]='%s',punycode='%s'", type(components), punycode)
-    is_found = (punycode == components.netloc or punycode == components.hostname)
+    is_found = (punycode in [components.netloc, components.hostname])
 
     logger.debug("is_found='%s' - EXIT!", is_found)
     return is_found
index 3a181529d0a03038d3a60004ddf1f45bb67df3f4..2b484d5c7fb90bf1bdbcb8bae035c1463b025c89 100644 (file)
@@ -223,3 +223,26 @@ def translate_idnas(rows: list, column: str):
             database.connection.commit()
 
     logger.debug("EXIT!")
+
+def alias_block_level(block_level: str) -> str:
+    logger.debug("block_level='%s' - CALLED!", block_level)
+    if not isinstance(block_level, str):
+        raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
+    elif block_level == "":
+        raise ValueError("Parameter 'block_level' is empty")
+
+    if block_level == "silence":
+        logger.debug("Block level 'silence' has been changed to 'silenced'")
+        block_level = "silenced"
+    elif block_level == "suspend":
+        logger.debug("Block level 'suspend' has been changed to 'suspended'")
+        block_level = "suspended"
+    elif block_level == "nsfw":
+        logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'")
+        block_level = "media_nsfw"
+    elif block_level == "quarantined_instances":
+        logger.debug("Block level 'quarantined_instances' has been changed to 'quarantined'")
+        block_level = "quarantined"
+
+    logger.debug("block_level='%s' - EXIT!", block_level)
+    return block_level
index 5700742258697be57a44c4b63b97c9b75e56ea2d..c64ae477f1ee055703c9820ec6e5afb6d80144b7 100644 (file)
@@ -308,14 +308,14 @@ def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
 
         logger.debug("script.contents[0][]='%s'", type(script.contents[0]))
 
-        isoData = script.contents[0].split("=")[1].strip().replace(":undefined", ":\"undefined\"")
-        logger.debug("isoData[%s]='%s'", type(isoData), isoData)
+        iso_data = script.contents[0].split("=")[1].strip().replace(":undefined", ":\"undefined\"")
+        logger.debug("iso_data[%s]='%s'", type(iso_data), iso_data)
 
         parsed = None
         try:
-            parsed = json.loads(isoData)
+            parsed = json.loads(iso_data)
         except json.decoder.JSONDecodeError as exception:
-            logger.warning("Exception '%s' during parsing %d Bytes: '%s'", type(exception), len(isoData), str(exception))
+            logger.warning("Exception '%s' during parsing %d Bytes: '%s'", type(exception), len(iso_data), str(exception))
             return list()
 
         logger.debug("parsed[%s]()=%d", type(parsed), len(parsed))
index f7964be4e486d835d89c0bb3455a34c93492d572..e49473f010367874103b0fdb30d35c06608e31ca 100644 (file)
@@ -28,6 +28,7 @@ from fba.helpers import tidyup
 from fba.http import federation
 from fba.http import network
 
+from fba.models import blocks
 from fba.models import instances
 
 logging.basicConfig(level=logging.INFO)
@@ -114,7 +115,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                 logger.debug("domain='%s' skipping block_level='accept'", domain)
                 continue
 
-            block_level = utils.alias_block_level(block_level)
+            block_level = blocks.alias_block_level(block_level)
 
             logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(blocklist), domain, block_level)
             if len(blocklist) > 0:
@@ -206,7 +207,7 @@ def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
                 logger.debug("domain='%s': Skipping block_level='%s' ...", domain, block_level)
                 continue
 
-            block_level = utils.alias_block_level(block_level)
+            block_level = blocks.alias_block_level(block_level)
 
             logger.debug("Checking %d entries from domain='%s',block_level='%s' ...", len(info.items()), domain, block_level)
             for blocked, reason in info.items():
index 56a004364ca86177e568d7376566aaeb0d4dfb58..95dcd73e4d9c3959b6fe2382d148affd7708d4a6 100644 (file)
@@ -185,26 +185,3 @@ def deobfuscate(domain: str, blocker: str, domain_hash: str = None) -> str:
 
     logger.debug("domain='%s' - EXIT!", domain)
     return domain
-
-def alias_block_level(block_level: str) -> str:
-    logger.debug("block_level='%s' - CALLED!", block_level)
-    if not isinstance(block_level, str):
-        raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
-    elif block_level == "":
-        raise ValueError("Parameter 'block_level' is empty")
-
-    if block_level == "silence":
-        logger.debug("Block level 'silence' has been changed to 'silenced'")
-        block_level = "silenced"
-    elif block_level == "suspend":
-        logger.debug("Block level 'suspend' has been changed to 'suspended'")
-        block_level = "suspended"
-    elif block_level == "nsfw":
-        logger.debug("Block level 'nsfw' has been changed to 'media_nsfw'")
-        block_level = "media_nsfw"
-    elif block_level == "quarantined_instances":
-        logger.debug("Block level 'quarantined_instances' has been changed to 'quarantined'")
-        block_level = "quarantined"
-
-    logger.debug("block_level='%s' - EXIT!", block_level)
-    return block_level