]> git.mxchange.org Git - fba.git/blobdiff - api.py
Continued:
[fba.git] / api.py
diff --git a/api.py b/api.py
index ec459f5987a95cb58e8f5f0f869457cbe8fbb3a4..3ee3c5c49aaf5926bdb0d352e3b11176619f522b 100644 (file)
--- a/api.py
+++ b/api.py
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+from datetime import datetime
+from email import utils
+
+import re
+
 from fastapi import Request, HTTPException, Query
 from fastapi.responses import JSONResponse
 from fastapi.responses import PlainTextResponse
 from fastapi.templating import Jinja2Templates
 from fastapi import Request, HTTPException, Query
 from fastapi.responses import JSONResponse
 from fastapi.responses import PlainTextResponse
 from fastapi.templating import Jinja2Templates
-from datetime import datetime
-from email import utils
 
 import fastapi
 import uvicorn
 import requests
 
 import fastapi
 import uvicorn
 import requests
-import re
 import validators
 
 import validators
 
-from fba import *
+from fba import config
+from fba import fba
+from fba import network
 
 router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
 templates = Jinja2Templates(directory="templates")
 
 @router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
 
 router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
 templates = Jinja2Templates(directory="templates")
 
 @router.get(config.get("base_url") + "/api/info.json", response_class=JSONResponse)
-def info():
-    fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'gotosocial', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
+def api_info():
+    fba.cursor.execute("SELECT (SELECT COUNT(domain) FROM instances), (SELECT COUNT(domain) FROM instances WHERE software IN ('pleroma', 'mastodon', 'misskey', 'friendica', 'bookwyrm', 'takahe', 'peertube')), (SELECT COUNT(blocker) FROM blocks), (SELECT COUNT(domain) FROM instances WHERE last_status_code IS NOT NULL)")
     known, indexed, blocklist, errorous = fba.cursor.fetchone()
 
     return {
     known, indexed, blocklist, errorous = fba.cursor.fetchone()
 
     return {
@@ -46,62 +50,51 @@ def info():
     }
 
 @router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
     }
 
 @router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
-def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
-    if blocked != None:
-        if blocked > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
-    elif blockers != None:
-        if blockers > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
-    elif reference != None:
-        if reference > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
-    elif software != None:
-        if software > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
-    elif originator != None:
-        if originator > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT originator, COUNT(domain) FROM instances WHERE originator IS NOT NULL GROUP BY originator ORDER BY COUNT(domain) DESC, originator ASC LIMIT ?", [originator])
-    elif error_code != None:
-        if error_code > 500:
-            raise HTTPException(status_code=400, detail="Too many results")
-        fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS cnt FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY cnt DESC LIMIT ?", [error_code])
+def api_top(mode: str, amount: int):
+    if amount > 500:
+        raise HTTPException(status_code=400, detail="Too many results")
+
+    if mode == "blocked":
+        fba.cursor.execute("SELECT blocked, COUNT(blocked) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY score DESC LIMIT ?", [amount])
+    elif mode == "blocker":
+        fba.cursor.execute("SELECT blocker, COUNT(blocker) AS score FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY score DESC LIMIT ?", [amount])
+    elif mode == "reference":
+        fba.cursor.execute("SELECT origin, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY score DESC LIMIT ?", [amount])
+    elif mode == "software":
+        fba.cursor.execute("SELECT software, COUNT(domain) AS score FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY score DESC, software ASC LIMIT ?", [amount])
+    elif mode == "command":
+        fba.cursor.execute("SELECT command, COUNT(domain) AS score FROM instances WHERE command IS NOT NULL GROUP BY command ORDER BY score DESC, command ASC LIMIT ?", [amount])
+    elif mode == "error_code":
+        fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS score FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY score DESC LIMIT ?", [amount])
     else:
         raise HTTPException(status_code=400, detail="No filter specified")
 
     else:
         raise HTTPException(status_code=400, detail="No filter specified")
 
-    scores = fba.cursor.fetchall()
-
-    scoreboard = []
+    scores = list()
 
 
-    for domain, highscore in scores:
-        scoreboard.append({
-            "domain"   : domain,
-            "highscore": highscore
+    for domain, score in fba.cursor.fetchall():
+        scores.append({
+            "domain": domain,
+            "score" : score
         })
 
         })
 
-    return scoreboard
+    return scores
 
 @router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
 
 @router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
-def blocked(domain: str = None, reason: str = None, reverse: str = None):
-    if domain == None and reason == None and reverse == None:
+def api_blocked(domain: str = None, reason: str = None, reverse: str = None):
+    if domain is None and reason is None and reverse is None:
         raise HTTPException(status_code=400, detail="No filter specified")
 
         raise HTTPException(status_code=400, detail="No filter specified")
 
-    if reason != None:
+    if reason is not None:
         reason = re.sub("(%|_)", "", reason)
         if len(reason) < 3:
             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
 
         reason = re.sub("(%|_)", "", reason)
         if len(reason) < 3:
             raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
 
-    if domain != None:
+    if domain is not None:
         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
         punycode = domain.encode('idna').decode('utf-8')
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
         punycode = domain.encode('idna').decode('utf-8')
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
                   (domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
-    elif reverse != None:
+    elif reverse is not None:
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
     else:
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
     else:
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
@@ -110,6 +103,9 @@ def blocked(domain: str = None, reason: str = None, reverse: str = None):
 
     result = {}
     for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
 
     result = {}
     for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
+        if reason is not None and reason != "":
+            reason = reason.replace(",", " ").replace("  ", " ")
+
         entry = {
             "blocker"   : blocker,
             "blocked"   : blocked,
         entry = {
             "blocker"   : blocker,
             "blocked"   : blocked,
@@ -117,6 +113,7 @@ def blocked(domain: str = None, reason: str = None, reverse: str = None):
             "first_seen": first_seen,
             "last_seen" : last_seen
         }
             "first_seen": first_seen,
             "last_seen" : last_seen
         }
+
         if block_level in result:
             result[block_level].append(entry)
         else:
         if block_level in result:
             result[block_level].append(entry)
         else:
@@ -125,7 +122,7 @@ def blocked(domain: str = None, reason: str = None, reverse: str = None):
     return result
 
 @router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
     return result
 
 @router.get(config.get("base_url") + "/api/mutual.json", response_class=JSONResponse)
-def mutual(domains: list[str] = Query()):
+def api_mutual(domains: list[str] = Query()):
     """Return 200 if federation is open between the two, 4xx otherwise"""
     fba.cursor.execute(
         "SELECT block_level FROM blocks " \
     """Return 200 if federation is open between the two, 4xx otherwise"""
     fba.cursor.execute(
         "SELECT block_level FROM blocks " \
@@ -149,23 +146,25 @@ def mutual(domains: list[str] = Query()):
     return JSONResponse(status_code=200, content={})
 
 @router.get(config.get("base_url") + "/scoreboard")
     return JSONResponse(status_code=200, content={})
 
 @router.get(config.get("base_url") + "/scoreboard")
-def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
-    if blockers != None and blockers > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
-    elif blocked != None and blocked > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
-    elif reference != None and reference > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
-    elif software != None and software > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
-    elif originator != None and originator > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?originator={originator}")
-    elif error_code != None and error_code > 0:
-        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?error_code={error_code}")
+def scoreboard(request: Request, mode: str, amount: int):
+    response = None
+
+    if mode == "blocker" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocker&amount={amount}")
+    elif mode == "blocked" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=blocked&amount={amount}")
+    elif mode == "reference" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=reference&amount={amount}")
+    elif mode == "software" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=software&amount={amount}")
+    elif mode == "command" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=command&amount={amount}")
+    elif mode == "error_code" and amount > 0:
+        response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode=error_code&amount={amount}")
     else:
         raise HTTPException(status_code=400, detail="No filter specified")
 
     else:
         raise HTTPException(status_code=400, detail="No filter specified")
 
-    if response == None:
+    if response is None:
         raise HTTPException(status_code=500, detail="Could not determine scores")
     elif not response.ok:
         raise HTTPException(status_code=response.status_code, detail=response.text)
         raise HTTPException(status_code=500, detail="Could not determine scores")
     elif not response.ok:
         raise HTTPException(status_code=response.status_code, detail=response.text)
@@ -175,13 +174,9 @@ def index(request: Request, blockers: int = None, blocked: int = None, reference
         "slogan"    : config.get("slogan"),
         "request"   : request,
         "scoreboard": True,
         "slogan"    : config.get("slogan"),
         "request"   : request,
         "scoreboard": True,
-        "blockers"  : blockers,
-        "blocked"   : blocked,
-        "reference" : reference,
-        "software"  : software,
-        "originator": originator,
-        "error_code": error_code,
-        "scores"    : fba.json_from_response(response)
+        "mode"      : mode,
+        "amount"    : amount,
+        "scores"    : network.json_from_response(response)
     })
 
 @router.get(config.get("base_url") + "/")
     })
 
 @router.get(config.get("base_url") + "/")
@@ -198,7 +193,7 @@ def index(request: Request):
     })
 
 @router.get(config.get("base_url") + "/top")
     })
 
 @router.get(config.get("base_url") + "/top")
-def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
+def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
     if domain == "" or reason == "" or reverse == "":
         raise HTTPException(status_code=500, detail="Insufficient parameter provided")
 
     if domain == "" or reason == "" or reverse == "":
         raise HTTPException(status_code=500, detail="Insufficient parameter provided")
 
@@ -210,20 +205,20 @@ def index(request: Request, domain: str = None, reason: str = None, reverse: str
     info = response.json()
     response = None
 
     info = response.json()
     response = None
 
-    if domain != None:
+    if domain is not None:
         if not validators.domain(domain):
             raise HTTPException(status_code=500, detail="Invalid domain")
 
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
         if not validators.domain(domain):
             raise HTTPException(status_code=500, detail="Invalid domain")
 
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
-    elif reason != None:
+    elif reason is not None:
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
-    elif reverse != None:
+    elif reverse is not None:
         if not validators.domain(reverse):
             raise HTTPException(status_code=500, detail="Invalid domain")
 
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
 
         if not validators.domain(reverse):
             raise HTTPException(status_code=500, detail="Invalid domain")
 
         response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
 
-    if response != None:
+    if response is not None:
         if not response.ok:
             raise HTTPException(status_code=response.status_code, detail=response.text)
         blocklist = response.json()
         if not response.ok:
             raise HTTPException(status_code=response.status_code, detail=response.text)
         blocklist = response.json()
@@ -243,7 +238,7 @@ def index(request: Request, domain: str = None, reason: str = None, reverse: str
 
 @router.get(config.get("base_url") + "/rss")
 def rss(request: Request, domain: str = None):
 
 @router.get(config.get("base_url") + "/rss")
 def rss(request: Request, domain: str = None):
-    if domain != None:
+    if domain is not None:
         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
         punycode = domain.encode('idna').decode('utf-8')
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
         wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
         punycode = domain.encode('idna').decode('utf-8')
         fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
@@ -256,7 +251,7 @@ def rss(request: Request, domain: str = None):
     blocklist = []
     for blocker, blocked, block_level, reason, first_seen, last_seen in result:
         first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
     blocklist = []
     for blocker, blocked, block_level, reason, first_seen, last_seen in result:
         first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
-        if reason == None or reason == '':
+        if reason is None or reason == '':
             reason = "No reason provided."
         else:
             reason = "Provided reason: '" + reason + "'"
             reason = "No reason provided."
         else:
             reason = "Provided reason: '" + reason + "'"
@@ -266,7 +261,8 @@ def rss(request: Request, domain: str = None):
             "blocked"    : blocked,
             "block_level": block_level,
             "reason"     : reason,
             "blocked"    : blocked,
             "block_level": block_level,
             "reason"     : reason,
-            "first_seen" : first_seen
+            "first_seen" : first_seen,
+            "last_seen"  : last_seen,
         })
 
     return templates.TemplateResponse("rss.xml", {
         })
 
     return templates.TemplateResponse("rss.xml", {