]> git.mxchange.org Git - fba.git/blobdiff - api.py
Continued:
[fba.git] / api.py
diff --git a/api.py b/api.py
index ec459f5987a95cb58e8f5f0f869457cbe8fbb3a4..3ee3c5c49aaf5926bdb0d352e3b11176619f522b 100644 (file)
--- a/api.py
+++ b/api.py
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+from datetime import datetime
+from email import utils
+
+import re
+
 from fastapi import Request, HTTPException, Query
 from fastapi.responses import JSONResponse
 from fastapi.responses import PlainTextResponse
 from fastapi.templating import Jinja2Templates
-from datetime import datetime
-from email import utils
 
 import fastapi
 import uvicorn
 import requests
-import re
 import validators
 
-from fba import *
+from fba import config
+from fba import fba
+from fba import network
 
 router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
 templates = Jinja2Templates(directory="templates")
 
 @router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
-def info():
-    fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
+def api_info():
+    fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
     known, indexed, blocklist, errorous = fba.cursor.fetchone()
 
     return {
@@ -46,62 +50,51 @@ def info():
     }
 
 @router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
-def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
-    if blocked != None:
-        if blocked > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
-    elif blockers != None:
-        if blockers > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
-    elif reference != None:
-        if reference > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
-    elif software != None:
-        if software > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
-    elif originator != None:
-        if originator > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT originator, COUNT(domain) FROM instances WHERE originator IS NOT NULL GROUP BY originator ORDER BY COUNT(domain) DESC, originator ASC LIMIT ?", [originator])
-    elif error_code != None:
-        if error_code > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS cnt FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY cnt DESC LIMIT ?", [error_code])
+def api_top(mode: str, amount: int):
+    if amount > 500:
+        raise HTTPException(status_code=400, detail="Too many results")
+
+    if mode == "blocked":
+        fba.cursor.execute("SELECT blocked, COUNT(blocked) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY score DESC LIMIT ?", [amount])
+    elif mode == "blocker":
+        fba.cursor.execute("SELECT blocker, COUNT(blocker) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY score DESC LIMIT ?", [amount])
+    elif mode == "reference":
+        fba.cursor.execute("SELECT origin, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY score DESC LIMIT ?", [amount])
+    elif mode == "software":
+        fba.cursor.execute("SELECT software, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY score DESC, software ASC LIMIT ?", [amount])
+    elif mode == "command":
+        fba.cursor.execute("SELECT command, COUNT(domain) AS score FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY score DESC, command ASC LIMIT ?", [amount])
+    elif mode == "error_code":
+        fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS score FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY score DESC LIMIT ?", [amount])
     else:
         raise HTTPException(status_code=400, detail="No filter specified")
 
-    scores = fba.cursor.fetchall()
-
-    scoreboard = []
+    scores = list()
 
-    for domain, highscore in scores:
-        scoreboard.append({
-            "domain"   : domain,
-            "highscore": highscore
+    for domain, score in fba.cursor.fetchall():
+        scores.append({
+            "domain": domain,
+            "score" : score
         })
 
-    return scoreboard
+    return scores
 
 @router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
-def blocked(domain: str = None, reason: str = None, reverse: str = None):
-    if domain == None and reason == None and reverse == None:
+def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
+    if domain is None and reason is None and reverse is None:
         raise HTTPException(status_code=400, detail="No filter specified")
 
-    if reason != None:
+    if reason is not None:
         reason = re.sub("(%|_)", "", reason)
         if len(reason) < 3:
             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
 
-    if domain != None:
+    if domain is not None:
         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
         punycode = domain.encode('idna').decode('utf-8')
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
-    elif reverse != None:
+    elif reverse is not None:
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
     else:
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
@@ -110,6 +103,9 @@ def blocked(domain: str = None, reason: str = None, reverse: str = None):
 
     result = {}
     for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
+        if reason is not None and reason != "":
+            reason = reason.replace(",", " ").replace("  ", " ")
+
         entry = {
             "blocker"   : blocker,
             "blocked"   : blocked,
@@ -117,6 +113,7 @@ def blocked(domain: str = None, reason: str = None, reverse: str = None):
             "first_seen": first_seen,
             "last_seen" : last_seen
         }
+
         if block_level in result:
             result[block_level].append(entry)
         else:
@@ -125,7 +122,7 @@ def blocked(domain: str = None, reason: str = None, reverse: str = None):
     return result
 
 @router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
-def mutual(domains: list[str] = Query()):
+def api_mutual(domains: list[str] = Query()):
     """Return 200 if federation is open between the two, 4xx otherwise"""
     fba.cursor.execute(
         "SELECT block_level FROM blocks " \
@@ -149,23 +146,25 @@ def mutual(domains: list[str] = Query()):
     return JSONResponse(status_code=200, content={})
 
 @router.get(config.get("base_url") + "/scoreboard")
-def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
-    if blockers != None and blockers > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
-    elif blocked != None and blocked > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
-    elif reference != None and reference > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
-    elif software != None and software > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
-    elif originator != None and originator > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?originator={originator}")
-    elif error_code != None and error_code > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?error_code={error_code}")
+def scoreboard(request: Request, mode: str, amount: int):
+    response = None
+
+    if mode == "blocker" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocker&amount={amount}")
+    elif mode == "blocked" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocked&amount={amount}")
+    elif mode == "reference" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=reference&amount={amount}")
+    elif mode == "software" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=software&amount={amount}")
+    elif mode == "command" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=command&amount={amount}")
+    elif mode == "error_code" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=error_code&amount={amount}")
     else:
         raise HTTPException(status_code=400, detail="No filter specified")
 
-    if response == None:
+    if response is None:
         raise HTTPException(status_code=500, detail="Could not determine scores")
     elif not response.ok:
         raise HTTPException(status_code=response.status_code, detail=response.text)
@@ -175,13 +174,9 @@ def index(request: Request, blockers: int = None, blocked: int = None, reference
         "slogan"    : config.get("slogan"),
         "request"   : request,
         "scoreboard": True,
-        "blockers"  : blockers,
-        "blocked"   : blocked,
-        "reference" : reference,
-        "software"  : software,
-        "originator": originator,
-        "error_code": error_code,
-        "scores"    : fba.json_from_response(response)
+        "mode"      : mode,
+        "amount"    : amount,
+        "scores"    : network.json_from_response(response)
     })
 
 @router.get(config.get("base_url") + "/")
@@ -198,7 +193,7 @@ def index(request: Request):
     })
 
 @router.get(config.get("base_url") + "/top")
-def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
+def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
     if domain == "" or reason == "" or reverse == "":
         raise HTTPException(status_code=500, detail="Insufficient parameter provided")
 
@@ -210,20 +205,20 @@ def index(request: Request, domain: str = None, reason: str = None, reverse: str
     info = response.json()
     response = None
 
-    if domain != None:
+    if domain is not None:
         if not validators.domain(domain):
             raise HTTPException(status_code=500, detail="Invalid domain")
 
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
-    elif reason != None:
+    elif reason is not None:
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
-    elif reverse != None:
+    elif reverse is not None:
         if not validators.domain(reverse):
             raise HTTPException(status_code=500, detail="Invalid domain")
 
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
 
-    if response != None:
+    if response is not None:
         if not response.ok:
             raise HTTPException(status_code=response.status_code, detail=response.text)
         blocklist = response.json()
@@ -243,7 +238,7 @@ def index(request: Request, domain: str = None, reason: str = None, reverse: str
 
 @router.get(config.get("base_url") + "/rss")
 def rss(request: Request, domain: str = None):
-    if domain != None:
+    if domain is not None:
         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
         punycode = domain.encode('idna').decode('utf-8')
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
@@ -256,7 +251,7 @@ def rss(request: Request, domain: str = None):
     blocklist = []
     for blocker, blocked, block_level, reason, first_seen, last_seen in result:
         first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
-        if reason == None or reason == '':
+        if reason is None or reason == '':
             reason = "No reason provided."
         else:
             reason = "Provided reason: '" + reason + "'"
@@ -266,7 +261,8 @@ def rss(request: Request, domain: str = None):
             "blocked"    : blocked,
             "block_level": block_level,
             "reason"     : reason,
-            "first_seen" : first_seen
+            "first_seen" : first_seen,
+            "last_seen"  : last_seen,
         })
 
     return templates.TemplateResponse("rss.xml", {