]> git.mxchange.org Git - fba.git/commitdiff
Continued:
authorRoland Häder <roland@mxchange.org>
Mon, 26 Jun 2023 18:32:38 +0000 (20:32 +0200)
committerRoland Häder <roland@mxchange.org>
Mon, 26 Jun 2023 18:44:35 +0000 (20:44 +0200)
- fixed some lint issues
- logged type of variable 'tag'
- changed single -> double quotes

api.py
fba/commands.py
fba/csrf.py
fba/helpers/software.py
fba/utils.py

diff --git a/api.py b/api.py
index 4f59c1b8614f6837bf6884f3ab5c64a7b20591f6..9b8276c0c30fd18bd56658ee1814bdf705aa03b3 100644 (file)
--- a/api.py
+++ b/api.py
@@ -14,8 +14,6 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
-import os
-import os.path
 import re
 
 from datetime import datetime
@@ -31,7 +29,6 @@ from fastapi.templating import Jinja2Templates
 
 import uvicorn
 import requests
-import validators
 
 from fba import database
 from fba import utils
@@ -112,7 +109,7 @@ def api_index(request: Request, mode: str, value: str, amount: int):
     elif amount > config.get("api_limit"):
         raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
 
-    domain = whildchar = punycode = reason = None
+    domain = wildchar = punycode = reason = None
 
     if mode == "block_level":
         database.cursor.execute(
index 82242dfdef08e98d6bc3051af9c22fd673d800ac..ed180bcb5eea632ed824403ca1863a89aec0fac0 100644 (file)
@@ -85,7 +85,6 @@ def fetch_pixelfed_api(args: argparse.Namespace) -> int:
         logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s) - EXIT!", type(exception), __name__)
         return list()
 
-    domains = list()
     try:
         logger.debug("Fetching JSON from pixelfed.org API, headers()=%d ...", len(headers))
         fetched = network.get_json_api(
@@ -493,7 +492,7 @@ def fetch_todon_wiki(args: argparse.Namespace) -> int:
 
             logger.info("Adding new block: blocked='%s',block_level='%s'", blocked, block_level)
             if utils.process_block("todon.eu", blocked, None, block_level) and block_level == "reject" and config.get("bot_enabled"):
-                logger.debug("Appending blocked='%s',reason='%s' for blocker='todon.eu' ...", block["blocked"], block["block_level"])
+                logger.debug("Appending blocked='%s',reason='%s' for blocker='todon.eu' ...", blocked, block_level)
                 blockdict.append({
                     "blocked": blocked,
                     "reason" : None,
@@ -551,6 +550,7 @@ def fetch_cs(args: argparse.Namespace):
     domains["reject"] = federation.find_domains(blocked)
 
     logger.debug("domains[silenced]()=%d,domains[reject]()=%d", len(domains["silenced"]), len(domains["reject"]))
+    blockdict = list()
     if len(domains) > 0:
         locking.acquire()
 
@@ -677,7 +677,7 @@ def fetch_fbabot_atom(args: argparse.Namespace) -> int:
                     logger.debug("Adding domain='%s',domains()=%d", domain, len(domains))
                     domains.append(domain)
 
-    logger.debug("domains()='%d", len(domains))
+    logger.debug("domains()=%d", len(domains))
     if len(domains) > 0:
         locking.acquire()
 
@@ -801,72 +801,75 @@ def fetch_oliphant(args: argparse.Namespace) -> int:
         response = utils.fetch_url(f"{base_url}/{block['csv_url']}", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
 
         logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
-        if response.ok and response.content != "":
-            logger.debug("Fetched %d Bytes, parsing CSV ...", len(response.content))
-            reader = csv.DictReader(response.content.decode('utf-8').splitlines(), dialect="unix")
+        if not response.ok or response.status_code > 399 or response.content == "":
+            logger.warning("Could not fetch csv_url='%s' for blocker='%s' - SKIPPED!", block["csv_url"], block["blocker"])
+            continue
 
-            logger.debug("reader[]='%s'", type(reader))
-            blockdict = list()
-            for row in reader:
-                logger.debug("row[%s]='%s'", type(row), row)
-                domain = severity = None
-                reject_media = reject_reports = False
-                if "#domain" in row:
-                    domain = row["#domain"]
-                elif "domain" in row:
-                    domain = row["domain"]
-                else:
-                    logger.debug("row='%s' does not contain domain column", row)
-                    continue
+        logger.debug("Fetched %d Bytes, parsing CSV ...", len(response.content))
+        reader = csv.DictReader(response.content.decode('utf-8').splitlines(), dialect="unix")
 
-                if "#severity" in row:
-                    severity = row["#severity"]
-                elif "severity" in row:
-                    severity = row["severity"]
-                else:
-                    logger.debug("row='%s' does not contain severity column", row)
-                    continue
+        logger.debug("reader[]='%s'", type(reader))
+        blockdict = list()
+        for row in reader:
+            logger.debug("row[%s]='%s'", type(row), row)
+            domain = severity = None
+            reject_media = reject_reports = False
+            if "#domain" in row:
+                domain = row["#domain"]
+            elif "domain" in row:
+                domain = row["domain"]
+            else:
+                logger.debug("row='%s' does not contain domain column", row)
+                continue
 
-                if "#reject_media" in row and row["#reject_media"].lower() == "true":
-                    reject_media = True
-                elif "reject_media" in row and row["reject_media"].lower() == "true":
-                    reject_media = True
+            if "#severity" in row:
+                severity = row["#severity"]
+            elif "severity" in row:
+                severity = row["severity"]
+            else:
+                logger.debug("row='%s' does not contain severity column", row)
+                continue
 
-                if "#reject_reports" in row and row["#reject_reports"].lower() == "true":
-                    reject_reports = True
-                elif "reject_reports" in row and row["reject_reports"].lower() == "true":
-                    reject_reports = True
+            if "#reject_media" in row and row["#reject_media"].lower() == "true":
+                reject_media = True
+            elif "reject_media" in row and row["reject_media"].lower() == "true":
+                reject_media = True
 
-                logger.debug("domain='%s',severity='%s',reject_media='%s',reject_reports='%s'", domain, severity, reject_media, reject_reports)
-                if not utils.is_domain_wanted(domain):
-                    logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
-                    continue
+            if "#reject_reports" in row and row["#reject_reports"].lower() == "true":
+                reject_reports = True
+            elif "reject_reports" in row and row["reject_reports"].lower() == "true":
+                reject_reports = True
 
-                logger.debug("Marking domain='%s' as handled", domain)
-                domains.append(domain)
+            logger.debug("domain='%s',severity='%s',reject_media='%s',reject_reports='%s'", domain, severity, reject_media, reject_reports)
+            if not utils.is_domain_wanted(domain):
+                logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
+                continue
 
-                logger.debug("Processing domain='%s' ...", domain)
-                processed = utils.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
-                logger.debug("processed='%s'", processed)
+            logger.debug("Marking domain='%s' as handled", domain)
+            domains.append(domain)
 
-                if utils.process_block(block["blocker"], domain, None, "reject") and config.get("bot_enabled"):
-                    logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", domain, block["block_level"], block["blocker"])
-                    blockdict.append({
-                        "blocked": domain,
-                        "reason" : block["reason"],
-                    })
+            logger.debug("Processing domain='%s' ...", domain)
+            processed = utils.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
+            logger.debug("processed='%s'", processed)
 
-                if reject_media:
-                    utils.process_block(block["blocker"], domain, None, "reject_media")
-                if reject_reports:
-                    utils.process_block(block["blocker"], domain, None, "reject_reports")
+            if utils.process_block(block["blocker"], domain, None, "reject") and config.get("bot_enabled"):
+                logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", domain, block["block_level"], block["blocker"])
+                blockdict.append({
+                    "blocked": domain,
+                    "reason" : block["reason"],
+                })
 
-            logger.debug("Invoking commit() ...")
-            database.connection.commit()
+            if reject_media:
+                utils.process_block(block["blocker"], domain, None, "reject_media")
+            if reject_reports:
+                utils.process_block(block["blocker"], domain, None, "reject_reports")
 
-            if config.get("bot_enabled") and len(blockdict) > 0:
-                logger.info("Sending bot POST for blocker='%s',blockdict()=%d ...", blocker, len(blockdict))
-                network.send_bot_post(block["blocker"], blockdict)
+        logger.debug("Invoking commit() ...")
+        database.connection.commit()
+
+        if config.get("bot_enabled") and len(blockdict) > 0:
+            logger.info("Sending bot POST for blocker='%s',blockdict()=%d ...", block["blocker"], len(blockdict))
+            network.send_bot_post(block["blocker"], blockdict)
 
     logger.debug("Success! - EXIT!")
     return 0
@@ -1065,7 +1068,7 @@ def fetch_joinfediverse(args: argparse.Namespace) -> int:
             continue
 
         logger.info("Proccessing blocked='%s' ...", block["blocked"])
-        processed = utils.process_domain(block["blocked"], "climatejustice.social", inspect.currentframe().f_code.co_name)
+        utils.process_domain(block["blocked"], "climatejustice.social", inspect.currentframe().f_code.co_name)
 
     blockdict = list()
     for blocker in domains:
@@ -1133,6 +1136,7 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
 
         logger.info("Checking %d block(s) from domain='%s' ...", len(blocking), row[0])
         obfuscated = 0
+        blockdict = list()
         for block in blocking:
             logger.debug("blocked='%s'", block["blocked"])
             blocked = None
@@ -1172,8 +1176,8 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
                     block["block_level"] = "suspended"
 
                 logger.info("blocked='%s' has been deobfuscated to blocked='%s', adding ...", block["blocked"], blocked)
-                if utils.process_block(row[0], blocked, block["reason"], block["block_level"]) and block["block_level"] == "reject" and config.get("bot_enabled"):
-                    logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", block["blocked"], block["block_level"], blocker)
+                if utils.process_block(row[0], block["blocked"], block["reason"], block["block_level"]) and block["block_level"] == "reject" and config.get("bot_enabled"):
+                    logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", block["blocked"], block["block_level"], row[0])
                     blockdict.append({
                         "blocked": block["blocked"],
                         "reason" : block["reason"],
@@ -1183,7 +1187,17 @@ def recheck_obfuscation(args: argparse.Namespace) -> int:
         if obfuscated == 0 and len(blocking) > 0:
             logger.info("Block list from domain='%s' has been fully deobfuscated.", row[0])
             instances.set_has_obfuscation(row[0], False)
+
+        if instances.has_pending(row[0]):
+            logger.debug("Flushing updates for blocker='%s' ...", row[0])
             instances.update_data(row[0])
 
+        logger.debug("Invoking commit() ...")
+        database.connection.commit()
+
+        if config.get("bot_enabled") and len(blockdict) > 0:
+            logger.info("Sending bot POST for blocker='%s,blockdict()=%d ...", row[0], len(blockdict))
+            network.send_bot_post(row[0], blockdict)
+
     logger.debug("Success! - EXIT!")
     return 0
index 00315d805d42d4df6c4af4bd541ff815ea5c830d..e68fcb63ef0ee25eebafcb09fc39afca50957e9a 100644 (file)
@@ -60,9 +60,9 @@ def determine(domain: str, headers: dict) -> dict:
         logger.debug("meta[]='%s'", type(meta))
         tag = meta.find("meta", attrs={"name": "csrf-token"})
 
-        logger.debug("tag='%s'", tag)
+        logger.debug("tag[%s]='%s'", type(tag), tag)
         if tag is not None:
-            logger.debug("Adding CSRF token='%s' for domain='%s'", tag['content'], domain)
+            logger.debug("Adding CSRF token='%s' for domain='%s'", tag["content"], domain)
             reqheaders["X-CSRF-Token"] = tag["content"]
 
     logger.debug("reqheaders()=%d - EXIT!", len(reqheaders))
index 2308d2cb3bb87a26d9eaffe5360cb81f9b43712b..30e411821409ecbd7c6f303087b450730e44551d 100644 (file)
@@ -17,6 +17,7 @@
 import logging
 
 from fba.helpers import tidyup
+from fba.helpers import version
 
 logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
index 8814ea1d9d3d53c66fdc7f39b988b831077940cb..bbbf3b4441f825246bd9f0c34bbe75beec0720ce 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2023 Free Software Foundation
+       # Copyright (C) 2023 Free Software Foundation
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Affero General Public License as published
@@ -23,7 +23,6 @@ import requests
 import validators
 
 from fba.helpers import blacklist
-from fba.helpers import config
 from fba.helpers import domain as domain_helper
 from fba.helpers import tidyup