]> git.mxchange.org Git - fba.git/blobdiff - api.py
Fixed some issues found by pylint:
[fba.git] / api.py
diff --git a/api.py b/api.py
index d2923495bbfa80d88c797c9dffc00f568190ceaf..3add50d2be4b84c9745021b6f33126c25bdbd121 100644 (file)
--- a/api.py
+++ b/api.py
@@ -27,7 +27,8 @@ import requests
 import re
 import validators
 
-from fba import *
+from fba import config
+from fba import fba
 
 router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
 templates = Jinja2Templates(directory="templates")
@@ -47,27 +48,27 @@ def info():
 
 @router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
 def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
-    if blocked != None:
+    if blocked is not None:
         if blocked > 500:
             raise HTTPException(status_code=400, detail="Too many results")
         fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
-    elif blockers != None:
+    elif blockers is not None:
         if blockers > 500:
             raise HTTPException(status_code=400, detail="Too many results")
         fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
-    elif reference != None:
+    elif reference is not None:
         if reference > 500:
             raise HTTPException(status_code=400, detail="Too many results")
         fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
-    elif software != None:
+    elif software is not None:
         if software > 500:
             raise HTTPException(status_code=400, detail="Too many results")
         fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
-    elif originator != None:
+    elif originator is not None:
         if originator > 500:
             raise HTTPException(status_code=400, detail="Too many results")
         fba.cursor.execute("SELECT originator, COUNT(domain) FROM instances WHERE originator IS NOT NULL GROUP BY originator ORDER BY COUNT(domain) DESC, originator ASC LIMIT ?", [originator])
-    elif error_code != None:
+    elif error_code is not None:
         if error_code > 500:
             raise HTTPException(status_code=400, detail="Too many results")
         fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS cnt FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY cnt DESC LIMIT ?", [error_code])
@@ -88,20 +89,20 @@ def top(blocked: int = None, blockers: int = None, reference: int = None, softwa
 
 @router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
 def blocked(domain: str = None, reason: str = None, reverse: str = None):
-    if domain == None and reason == None and reverse == None:
+    if domain is None and reason is None and reverse is None:
         raise HTTPException(status_code=400, detail="No filter specified")
 
-    if reason != None:
+    if reason is not None:
         reason = re.sub("(%|_)", "", reason)
         if len(reason) < 3:
             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
 
-    if domain != None:
+    if domain is not None:
         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
         punycode = domain.encode('idna').decode('utf-8')
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
-    elif reverse != None:
+    elif reverse is not None:
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
     else:
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
@@ -110,7 +111,7 @@ def blocked(domain: str = None, reason: str = None, reverse: str = None):
 
     result = {}
     for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
-        if reason != None and reason != "":
+        if reason is not None and reason != "":
             reason = reason.replace(",", " ").replace("  ", " ")
 
         entry = {
@@ -153,23 +154,23 @@ def mutual(domains: list[str] = Query()):
     return JSONResponse(status_code=200, content={})
 
 @router.get(config.get("base_url") + "/scoreboard")
-def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
-    if blockers != None and blockers > 0:
+def scoreboard(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
+    if blockers is not None and blockers > 0:
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
-    elif blocked != None and blocked > 0:
+    elif blocked is not None and blocked > 0:
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
-    elif reference != None and reference > 0:
+    elif reference is not None and reference > 0:
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
-    elif software != None and software > 0:
+    elif software is not None and software > 0:
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
-    elif originator != None and originator > 0:
+    elif originator is not None and originator > 0:
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?originator={originator}")
-    elif error_code != None and error_code > 0:
+    elif error_code is not None and error_code > 0:
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?error_code={error_code}")
     else:
         raise HTTPException(status_code=400, detail="No filter specified")
 
-    if response == None:
+    if response is None:
         raise HTTPException(status_code=500, detail="Could not determine scores")
     elif not response.ok:
         raise HTTPException(status_code=response.status_code, detail=response.text)
@@ -202,7 +203,7 @@ def index(request: Request):
     })
 
 @router.get(config.get("base_url") + "/top")
-def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
+def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
     if domain == "" or reason == "" or reverse == "":
         raise HTTPException(status_code=500, detail="Insufficient parameter provided")
 
@@ -214,20 +215,20 @@ def index(request: Request, domain: str = None, reason: str = None, reverse: str
     info = response.json()
     response = None
 
-    if domain != None:
+    if domain is not None:
         if not validators.domain(domain):
             raise HTTPException(status_code=500, detail="Invalid domain")
 
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
-    elif reason != None:
+    elif reason is not None:
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
-    elif reverse != None:
+    elif reverse is not None:
         if not validators.domain(reverse):
             raise HTTPException(status_code=500, detail="Invalid domain")
 
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
 
-    if response != None:
+    if response is not None:
         if not response.ok:
             raise HTTPException(status_code=response.status_code, detail=response.text)
         blocklist = response.json()
@@ -247,7 +248,7 @@ def index(request: Request, domain: str = None, reason: str = None, reverse: str
 
 @router.get(config.get("base_url") + "/rss")
 def rss(request: Request, domain: str = None):
-    if domain != None:
+    if domain is not None:
         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
         punycode = domain.encode('idna').decode('utf-8')
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
@@ -260,7 +261,7 @@ def rss(request: Request, domain: str = None):
     blocklist = []
     for blocker, blocked, block_level, reason, first_seen, last_seen in result:
         first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
-        if reason == None or reason == '':
+        if reason is None or reason == '':
             reason = "No reason provided."
         else:
             reason = "Provided reason: '" + reason + "'"
@@ -270,7 +271,8 @@ def rss(request: Request, domain: str = None):
             "blocked"    : blocked,
             "block_level": block_level,
             "reason"     : reason,
-            "first_seen" : first_seen
+            "first_seen" : first_seen,
+            "last_seen"  : last_seen,
         })
 
     return templates.TemplateResponse("rss.xml", {