]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Tue, 11 Jul 2023 12:00:27 +0000 (14:00 +0200)
committerRoland Häder <roland@mxchange.org>
Tue, 11 Jul 2023 12:10:06 +0000 (14:10 +0200)
- fixed issues from pylint

daemon.py
fba/commands.py
fba/helpers/domain.py
fba/helpers/processing.py
fba/http/federation.py
fba/models/blocks.py
fba/models/instances.py
fba/models/sources.py
fba/networks/lemmy.py

index 4e22ce97602b3c8ba38bdc716da693cd47d2c5f1..a6135a1eb3b50d11049416da4f41d1ad1356c1ec 100755 (executable)
--- a/daemon.py
+++ b/daemon.py
@@ -111,8 +111,6 @@ def api_list(request: Request, mode: str, value: str, amount: int):
     elif amount > config.get("api_limit"):
         raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
 
-    domain = wildchar = punycode = reason = None
-
     if mode in ("detection_mode", "software", "command"):
         database.cursor.execute(
             f"SELECT domain, origin, software, detection_mode, command, total_peers, total_blocks, first_seen, last_updated \
@@ -326,10 +324,10 @@ def list_domains(request: Request, mode: str, value: str, amount: int = config.g
     domainlist = list()
     if response is not None and response.ok:
         domainlist = response.json()
-        format = config.get("timestamp_format")
+        tformat = config.get("timestamp_format")
         for row in domainlist:
-            row["first_seen"]   = datetime.utcfromtimestamp(row["first_seen"]).strftime(format)
-            row["last_updated"] = datetime.utcfromtimestamp(row["last_updated"]).strftime(format) if isinstance(row["last_updated"], float) else None
+            row["first_seen"]   = datetime.utcfromtimestamp(row["first_seen"]).strftime(tformat)
+            row["last_updated"] = datetime.utcfromtimestamp(row["last_updated"]).strftime(tformat) if isinstance(row["last_updated"], float) else None
 
     return templates.TemplateResponse("views/list.html", {
         "request"   : request,
@@ -356,11 +354,11 @@ def top(request: Request, mode: str, value: str, amount: int = config.get("api_l
     if response.ok and response.status_code == 200 and len(response.text) > 0:
         blocklist = response.json()
 
-        format = config.get("timestamp_format")
+        tformat = config.get("timestamp_format")
         for block_level in blocklist:
             for row in blocklist[block_level]:
-                row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(format)
-                row["last_seen"]  = datetime.utcfromtimestamp(row["last_seen"]).strftime(format) if isinstance(row["last_seen"], float) else None
+                row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(tformat)
+                row["last_seen"]  = datetime.utcfromtimestamp(row["last_seen"]).strftime(tformat) if isinstance(row["last_seen"], float) else None
                 found = found + 1
 
     return templates.TemplateResponse("views/top.html", {
@@ -393,12 +391,12 @@ def infos(request: Request, domain: str):
     domain_data = response.json()
 
     # Format timestamps
-    format = config.get("timestamp_format")
+    tformat = config.get("timestamp_format")
     instance = dict()
     for key in domain_data.keys():
         if key in ["last_nodeinfo", "last_blocked", "first_seen", "last_updated", "last_instance_fetch"] and isinstance(domain_data[key], float):
             # Timestamps
-            instance[key] = datetime.utcfromtimestamp(domain_data[key]).strftime(format)
+            instance[key] = datetime.utcfromtimestamp(domain_data[key]).strftime(tformat)
         else:
             # Generic
             instance[key] = domain_data[key]
index a58440b77dff126cf6b4978f13530c699780877d..5047b5a9ce50f19651f1d9806301ef67948dac9b 100644 (file)
@@ -316,7 +316,6 @@ def fetch_blocks(args: argparse.Namespace) -> int:
         instances.set_has_obfuscation(blocker, False)
 
         blocking = list()
-        blockdict = list()
         if software == "pleroma":
             logger.info("blocker='%s',software='%s'", blocker, software)
             blocking = pleroma.fetch_blocks(blocker, nodeinfo_url)
@@ -1454,9 +1453,10 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
             logger.debug("Invoking instances.set_total_blocks(%s, %d) ...", row["domain"], len(blocking))
             instances.set_total_blocks(row["domain"], blocking)
 
-        logger.info("Checking %d block(s) from domain='%s' ...", len(blocking), row["domain"])
         obfuscated = 0
         blockdict = list()
+
+        logger.info("Checking %d block(s) from domain='%s' ...", len(blocking), row["domain"])
         for block in blocking:
             logger.debug("block[blocked]='%s'", block["blocked"])
             blocked = None
@@ -1551,13 +1551,12 @@ def fetch_fedilist(args: argparse.Namespace) -> int:
 
     logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
     if not response.ok or response.status_code >= 300 or len(response.content) == 0:
-        logger.warning("Failed fetching url='%s': response.ok='%s',response.status_code=%d,response.content()=%d - EXIT!", response.ok, response.status_code, len(response.text))
+        logger.warning("Failed fetching url='%s': response.ok='%s',response.status_code=%d,response.content()=%d - EXIT!", url, response.ok, response.status_code, len(response.text))
         return 1
 
     reader = csv.DictReader(response.content.decode("utf-8").splitlines(), dialect="unix")
 
     logger.debug("reader[]='%s'", type(reader))
-    blockdict = list()
     for row in reader:
         logger.debug("row[]='%s'", type(row))
         domain = tidyup.domain(row["hostname"])
@@ -1575,7 +1574,7 @@ def fetch_fedilist(args: argparse.Namespace) -> int:
             logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
             continue
         elif (args.all is None or not args.all) and instances.is_registered(domain):
-            logger.debug("domain='%s' is already registered, --all not specified: args.all[]='%s'", type(args.all))
+            logger.debug("domain='%s' is already registered, --all not specified: args.all[]='%s'", domain, type(args.all))
             continue
         elif instances.is_recent(domain):
             logger.debug("domain='%s' has been recently crawled - SKIPPED!", domain)
index 1830e3f41243a0926c6dd9c4485d8f074d49d41e..a38847963c7ebcc71e36906341a35980688fd020 100644 (file)
@@ -47,7 +47,7 @@ def is_in_url(domain: str, url: str) -> bool:
     raise_on(domain)
 
     if not isinstance(url, str):
-        raise ValueError(f"Parameter url[]='%s' is not of type 'str'", type(url))
+        raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
     elif url == "":
         raise ValueError("Parameter 'url' is empty")
 
index 426ee2627d7d93333c10c79fe0ba0439fad916cd..352ab31b6a6d381915793339757a53d28e88a232 100644 (file)
@@ -15,6 +15,8 @@
 
 import logging
 
+from fba import utils
+
 from fba.helpers import domain as domain_helper
 
 from fba.http import federation
@@ -26,9 +28,9 @@ from fba.models import instances
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
-def domain(domain: str, blocker: str, command: str) -> bool:
-    logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
-    domain_helper.raise_on(domain)
+def domain(name: str, blocker: str, command: str) -> bool:
+    logger.debug("name='%s',blocker='%s',command='%s' - CALLED!", name, blocker, command)
+    domain_helper.raise_on(name)
     domain_helper.raise_on(blocker)
 
     if not isinstance(command, str):
@@ -36,34 +38,34 @@ def domain(domain: str, blocker: str, command: str) -> bool:
     elif command == "":
         raise ValueError("Parameter 'command' is empty")
 
-    logger.debug("domain='%s' - BEFORE!", domain)
-    domain = deobfuscate(domain, blocker)
+    logger.debug("name='%s' - BEFORE!", name)
+    name = utils.deobfuscate(name, blocker)
 
-    logger.debug("domain='%s' - DEOBFUSCATED!", domain)
+    logger.debug("name='%s' - DEOBFUSCATED!", name)
     if instances.has_pending(blocker):
         logger.debug("Flushing updates for blocker='%s' ...", blocker)
         instances.update_data(blocker)
 
-    if not is_domain_wanted(domain):
-        logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
+    if not utils.is_domain_wanted(name):
+        logger.debug("name='%s' is not wanted - SKIPPED!", name)
         return False
-    elif instances.is_recent(domain):
-        logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
+    elif instances.is_recent(name):
+        logger.debug("name='%s' has been recently checked - SKIPPED!", name)
         return False
 
     processed = False
     try:
-        logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
-        federation.fetch_instances(domain, blocker, None, command)
+        logger.info("Fetching instances for name='%s',blocker='%s',command='%s' ...", name, blocker, command)
+        federation.fetch_instances(name, blocker, None, command)
         processed = True
     except network.exceptions as exception:
-        logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
-        instances.set_last_error(domain, exception)
+        logger.warning("Exception '%s' during fetching instances (%s) from name='%s'", type(exception), command, name)
+        instances.set_last_error(name, exception)
 
-    logger.debug("Checking if domain='%s' has pending updates ...", domain)
-    if instances.has_pending(domain):
-        logger.debug("Flushing updates for domain='%s' ...", domain)
-        instances.update_data(domain)
+    logger.debug("Checking if name='%s' has pending updates ...", name)
+    if instances.has_pending(name):
+        logger.debug("Flushing updates for name='%s' ...", name)
+        instances.update_data(name)
 
     logger.debug("processed='%s' - EXIT!", processed)
     return processed
index 1418755821c1b761eea59a6790e0c8db9e038331..b4477cd32cbd12ab5c0de6fdaf59714f5b6f62d9 100644 (file)
@@ -18,6 +18,7 @@ import logging
 from urllib.parse import urlparse
 
 import bs4
+import requests
 import validators
 
 from fba import csrf
index 2b484d5c7fb90bf1bdbcb8bae035c1463b025c89..1a861c5ea3f7981e29bfa502bc41f8a623934f1c 100644 (file)
@@ -181,28 +181,28 @@ def valid(value: str, column: str) -> bool:
     elif value == "":
         raise ValueError("Parameter 'value' is empty")
     elif not isinstance(column, str):
-        raise columnError(f"Parameter column[]='{type(column)}' is not of type 'str'")
+        raise ValueError(f"Parameter column[]='{type(column)}' is not of type 'str'")
     elif column == "":
-        raise columnError("Parameter 'column' is empty")
+        raise ValueError("Parameter 'column' is empty")
 
     # Query database
     database.cursor.execute(
         f"SELECT {column} FROM blocks WHERE {column} = ? LIMIT 1", [value]
     )
 
-    valid = database.cursor.fetchone() is not None
+    is_valid = database.cursor.fetchone() is not None
 
-    logger.debug("valid='%s' - EXIT!", valid)
-    return valid
+    logger.debug("is_valid='%s' - EXIT!", is_valid)
+    return is_valid
 
 def translate_idnas(rows: list, column: str):
     logger.debug("rows[]='%s' - CALLED!", type(rows))
     if not isinstance(rows, list):
-        raise ValueError(f"rows[]='%s' is not of type 'list'", type(rows))
+        raise ValueError(f"rows[]='{type(rows)}' is not of type 'list'")
     elif len(rows) == 0:
         raise ValueError("Parameter 'rows' is an empty list")
     elif not isinstance(column, str):
-        raise ValueError(f"column='%s' is not of type 'str'", type(column))
+        raise ValueError(f"column='{type(column)}' is not of type 'str'")
     elif column == "":
         raise ValueError("Parameter 'column' is empty")
     elif column not in ["blocker", "blocked"]:
@@ -227,7 +227,7 @@ def translate_idnas(rows: list, column: str):
 def alias_block_level(block_level: str) -> str:
     logger.debug("block_level='%s' - CALLED!", block_level)
     if not isinstance(block_level, str):
-        raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
+        raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
     elif block_level == "":
         raise ValueError("Parameter 'block_level' is empty")
 
index 1b8450499fbb7594c18ee738fef5fe2279751d68..5f7af4529fef7aac9365800f53af6eb927dd1d15 100644 (file)
@@ -273,7 +273,7 @@ def set_success(domain: str):
 def is_registered(domain: str, skip_raise = False) -> bool:
     logger.debug("domain='%s',skip_raise='%s' - CALLED!", domain, skip_raise)
     if not isinstance(skip_raise, bool):
-        raise ValueError(f"skip_raise[]='%s' is not type of 'bool'", type(skip_raise))
+        raise ValueError(f"skip_raise[]='{type(skip_raise)}' is not type of 'bool'")
 
     if not skip_raise:
         domain_helper.raise_on(domain)
@@ -326,7 +326,7 @@ def deobfuscate(char: str, domain: str, blocked_hash: str = None) -> tuple:
     elif not char in domain:
         raise ValueError(f"char='{char}' not found in domain='{domain}' but function invoked")
     elif not isinstance(domain, str):
-        raise ValueError(f"Parameter domain[]='%s'", type(domain))
+        raise ValueError(f"Parameter domain[]='{type(domain)}'")
     elif domain == "":
         raise ValueError("Parameter 'domain' is empty")
     elif not isinstance(blocked_hash, str) and blocked_hash is not None:
@@ -354,7 +354,7 @@ def deobfuscate(char: str, domain: str, blocked_hash: str = None) -> tuple:
         logger.debug("domain='%s' - AFTER!", domain)
 
         if domain == "":
-            debug.warning("domain is empty after tidyup - EXIT!")
+            logger.warning("domain is empty after tidyup - EXIT!")
             return None
 
         search = domain.replace(char, "_")
@@ -465,28 +465,28 @@ def valid(value: str, column: str) -> bool:
     elif value == "":
         raise ValueError("Parameter 'value' is empty")
     elif not isinstance(column, str):
-        raise columnError(f"Parameter column[]='{type(column)}' is not of type 'str'")
+        raise ValueError(f"Parameter column[]='{type(column)}' is not of type 'str'")
     elif column == "":
-        raise columnError("Parameter 'column' is empty")
+        raise ValueError("Parameter 'column' is empty")
 
     # Query database
     database.cursor.execute(
         f"SELECT {column} FROM instances WHERE {column} = ? LIMIT 1", [value]
     )
 
-    valid = database.cursor.fetchone() is not None
+    is_valid = database.cursor.fetchone() is not None
 
-    logger.debug("valid='%s' - EXIT!", valid)
-    return valid
+    logger.debug("is_valid='%s' - EXIT!", is_valid)
+    return is_valid
 
 def translate_idnas(rows: list, column: str):
     logger.debug("rows[]='%s' - CALLED!", type(rows))
     if not isinstance(rows, list):
-        raise ValueError(f"rows[]='%s' is not of type 'list'", type(rows))
+        raise ValueError("rows[]='{type(rows)}' is not of type 'list'")
     elif len(rows) == 0:
         raise ValueError("Parameter 'rows' is an empty list")
     elif not isinstance(column, str):
-        raise ValueError(f"column='%s' is not of type 'str'", type(column))
+        raise ValueError(f"column='{type(column)}' is not of type 'str'")
     elif column == "":
         raise ValueError("Parameter 'column' is empty")
     elif column not in ["domain", "origin"]:
index 75ddcf4971a8389c8249b84d97d76b1a2c363c2c..0fec36436e164252c7b84aef1305f395cc2ac683 100644 (file)
@@ -40,16 +40,16 @@ def is_recent(source_domain: str) -> bool:
     logger.debug("source_domain='%s' - CALLED!", source_domain)
     domain_helper.raise_on(source_domain)
 
-    is_recent = False
+    recent = False
 
     row = fetch(source_domain)
     logger.debug("row[]='%s'", type(row))
     if row is not None:
         logger.debug("source_domain='%s',row[last_accessed]=%d", source_domain, row["last_accessed"])
-        is_recent = (time.time() - row["last_accessed"]) <= config.get("source_last_access")
+        recent = (time.time() - row["last_accessed"]) <= config.get("source_last_access")
 
-    logger.debug("is_recent='%s' - EXIT!", is_recent)
-    return is_recent
+    logger.debug("recent='%s' - EXIT!", recent)
+    return recent
 
 def update (source_domain: str):
     logger.debug("source_domain='%s' - CALLED!", source_domain)
index c64ae477f1ee055703c9820ec6e5afb6d80144b7..238f673c7fc4945a408d5efb30eacd8f8599a393 100644 (file)
@@ -288,7 +288,7 @@ def parse_script(doc: bs4.BeautifulSoup, only: str = None) -> list:
     logger.debug("doc[]='%s',only='%s' - CALLED!")
     if not isinstance(doc, bs4.BeautifulSoup):
         raise ValueError(f"Parameter doc[]='{type(only)}' is not of type 'bs4.BeautifulSoup'")
-    elif not isinstance(only, str) and only != None:
+    elif not isinstance(only, str) and only is not None:
         raise ValueError(f"Parameter only[]='{type(only)}' is not of type 'str'")
     elif isinstance(only, str) and only == "":
         raise ValueError("Parameter 'only' is empty")