import re
import validators
-from fba import *
+from fba import config
+from fba import fba
router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
templates = Jinja2Templates(directory="templates")
@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
def top(blocked: int = None, blockers: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
- if blocked != None:
+ if blocked is not None:
if blocked > 500:
raise HTTPException(status_code=400, detail="Too many results")
fba.cursor.execute("SELECT blocked, COUNT(blocked) FROM blocks WHERE block_level = 'reject' GROUP BY blocked ORDER BY COUNT(blocked) DESC LIMIT ?", [blocked])
- elif blockers != None:
+ elif blockers is not None:
if blockers > 500:
raise HTTPException(status_code=400, detail="Too many results")
fba.cursor.execute("SELECT blocker, COUNT(blocker) FROM blocks WHERE block_level = 'reject' GROUP BY blocker ORDER BY COUNT(blocker) DESC LIMIT ?", [blockers])
- elif reference != None:
+ elif reference is not None:
if reference > 500:
raise HTTPException(status_code=400, detail="Too many results")
fba.cursor.execute("SELECT origin, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY origin ORDER BY COUNT(domain) DESC LIMIT ?", [reference])
- elif software != None:
+ elif software is not None:
if software > 500:
raise HTTPException(status_code=400, detail="Too many results")
fba.cursor.execute("SELECT software, COUNT(domain) FROM instances WHERE software IS NOT NULL GROUP BY software ORDER BY COUNT(domain) DESC, software ASC LIMIT ?", [software])
- elif originator != None:
+ elif originator is not None:
if originator > 500:
raise HTTPException(status_code=400, detail="Too many results")
fba.cursor.execute("SELECT originator, COUNT(domain) FROM instances WHERE originator IS NOT NULL GROUP BY originator ORDER BY COUNT(domain) DESC, originator ASC LIMIT ?", [originator])
- elif error_code != None:
+ elif error_code is not None:
if error_code > 500:
raise HTTPException(status_code=400, detail="Too many results")
fba.cursor.execute("SELECT last_status_code, COUNT(domain) AS cnt FROM instances WHERE last_status_code IS NOT NULL AND last_status_code != '200' GROUP BY last_status_code ORDER BY cnt DESC LIMIT ?", [error_code])
@router.get(config.get("base_url") + "/api/index.json", response_class=JSONResponse)
def blocked(domain: str = None, reason: str = None, reverse: str = None):
- if domain == None and reason == None and reverse == None:
+ if domain is None and reason is None and reverse is None:
raise HTTPException(status_code=400, detail="No filter specified")
- if reason != None:
+ if reason is not None:
reason = re.sub("(%|_)", "", reason)
if len(reason) < 3:
raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
- if domain != None:
+ if domain is not None:
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode('idna').decode('utf-8')
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen ASC",
(domain, "*." + domain, wildchar, fba.get_hash(domain), punycode, "*." + punycode))
- elif reverse != None:
+ elif reverse is not None:
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
else:
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
result = {}
for blocker, blocked, block_level, reason, first_seen, last_seen in blocklist:
- if reason != None and reason != "":
+ if reason is not None and reason != "":
reason = reason.replace(",", " ").replace(" ", " ")
entry = {
return JSONResponse(status_code=200, content={})
@router.get(config.get("base_url") + "/scoreboard")
-def index(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
- if blockers != None and blockers > 0:
+def scoreboard(request: Request, blockers: int = None, blocked: int = None, reference: int = None, software: int = None, originator: int = None, error_code: int = None):
+ if blockers is not None and blockers > 0:
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blockers={blockers}")
- elif blocked != None and blocked > 0:
+ elif blocked is not None and blocked > 0:
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?blocked={blocked}")
- elif reference != None and reference > 0:
+ elif reference is not None and reference > 0:
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?reference={reference}")
- elif software != None and software > 0:
+ elif software is not None and software > 0:
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?software={software}")
- elif originator != None and originator > 0:
+ elif originator is not None and originator > 0:
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?originator={originator}")
- elif error_code != None and error_code > 0:
+ elif error_code is not None and error_code > 0:
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?error_code={error_code}")
else:
raise HTTPException(status_code=400, detail="No filter specified")
- if response == None:
+ if response is None:
raise HTTPException(status_code=500, detail="Could not determine scores")
elif not response.ok:
raise HTTPException(status_code=response.status_code, detail=response.text)
})
@router.get(config.get("base_url") + "/top")
-def index(request: Request, domain: str = None, reason: str = None, reverse: str = None):
+def top(request: Request, domain: str = None, reason: str = None, reverse: str = None):
if domain == "" or reason == "" or reverse == "":
raise HTTPException(status_code=500, detail="Insufficient parameter provided")
info = response.json()
response = None
- if domain != None:
+ if domain is not None:
if not validators.domain(domain):
raise HTTPException(status_code=500, detail="Invalid domain")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?domain={domain}")
- elif reason != None:
+ elif reason is not None:
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reason={reason}")
- elif reverse != None:
+ elif reverse is not None:
if not validators.domain(reverse):
raise HTTPException(status_code=500, detail="Invalid domain")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/index.json?reverse={reverse}")
- if response != None:
+ if response is not None:
if not response.ok:
raise HTTPException(status_code=response.status_code, detail=response.text)
blocklist = response.json()
@router.get(config.get("base_url") + "/rss")
def rss(request: Request, domain: str = None):
- if domain != None:
+ if domain is not None:
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode('idna').decode('utf-8')
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? OR blocked = ? ORDER BY first_seen DESC LIMIT 50",
blocklist = []
for blocker, blocked, block_level, reason, first_seen, last_seen in result:
first_seen = utils.format_datetime(datetime.fromtimestamp(first_seen))
- if reason == None or reason == '':
+ if reason is None or reason == '':
reason = "No reason provided."
else:
reason = "Provided reason: '" + reason + "'"
"blocked" : blocked,
"block_level": block_level,
"reason" : reason,
- "first_seen" : first_seen
+ "first_seen" : first_seen,
+ "last_seen" : last_seen,
})
return templates.TemplateResponse("rss.xml", {