from fba.helpers import tidyup
from fba.models import blocks
+from fba.models import instances
router = fastapi.FastAPI(docs_url=config.get("base_url") + "/docs", redoc_url=config.get("base_url") + "/redoc")
router.mount(
return scores
+@router.get(config.get("base_url") + "/api/list.json", response_class=JSONResponse)
+def api_index(request: Request, mode: str, value: str, amount: int):
+ if mode is None or value is None or amount is None:
+ raise HTTPException(status_code=500, detail="No filter specified")
+ elif amount > config.get("api_limit"):
+ raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
+
+ domain = wildchar = punycode = reason = None
+
+ if mode == "detection_mode":
+ database.cursor.execute(
+ f"SELECT domain, origin, software, command, total_peers, total_blocks, first_seen, last_updated FROM instances WHERE {mode} = ? LIMIT ?", [value, amount]
+ )
+
+ domainlist = database.cursor.fetchall()
+
+ return domainlist
+
@router.get(config.get("base_url") + "/api/top.json", response_class=JSONResponse)
def api_index(request: Request, mode: str, value: str, amount: int):
if mode is None or value is None or amount is None:
"slogan" : config.get("slogan"),
})
-@router.get(config.get("base_url") + "/top")
-def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
- response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/info.json")
+@router.get(config.get("base_url") + "/list")
+def list_domains(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
+ if mode == "detection_mode" and not instances.valid(value, "detection_mode"):
+ raise HTTPException(status_code=500, detail="Invalid detection mode provided")
- if not response.ok:
- raise HTTPException(status_code=response.status_code, detail=response.text)
- elif mode == "" or value == "" or amount == 0:
- raise HTTPException(status_code=500, detail="Parameter mode, value and amount must always be set")
- elif amount > config.get("api_limit"):
- raise HTTPException(status_code=500, detail=f"amount='{amount}' is to big")
+ response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/list.json?mode={mode}&value={value}&amount={amount}")
- info = response.json()
- blocklist = list()
+ domainlist = list()
+ if response is not None and response.ok:
+ domainlist = response.json()
+ format = config.get("timestamp_format")
+ for row in domainlist:
+ row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(format)
+ row["last_updated"] = datetime.utcfromtimestamp(row["last_updated"]).strftime(format)
- if mode == "block_level" and not blocks.is_valid_level(value):
+ return templates.TemplateResponse("views/list.html", {
+ "request" : request,
+ "mode" : mode if response is not None else None,
+ "value" : value if response is not None else None,
+ "amount" : amount if response is not None else None,
+ "found" : len(domainlist),
+ "domainlist": domainlist,
+ "slogan" : config.get("slogan"),
+ "theme" : config.get("theme"),
+ })
+
+@router.get(config.get("base_url") + "/top")
+def top(request: Request, mode: str, value: str, amount: int = config.get("api_limit")):
+ if mode == "block_level" and not blocks.valid(value, "block_level"):
raise HTTPException(status_code=500, detail="Invalid block level provided")
elif mode in ["domain", "reverse"] and not utils.is_domain_wanted(value):
raise HTTPException(status_code=500, detail="Invalid or blocked domain specified")
response = requests.get(f"http://{config.get('host')}:{config.get('port')}{config.get('base_url')}/api/top.json?mode={mode}&value={value}&amount={amount}")
+ found = 0
+ blocklist = list()
if response is not None and response.ok:
blocklist = response.json()
- found = 0
- for block_level in blocklist:
- for block in blocklist[block_level]:
- block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(config.get("timestamp_format"))
- block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(config.get("timestamp_format"))
- found = found + 1
+ format = config.get("timestamp_format")
+ for block_level in blocklist:
+ for block in blocklist[block_level]:
+ block["first_seen"] = datetime.utcfromtimestamp(block["first_seen"]).strftime(format)
+ block["last_seen"] = datetime.utcfromtimestamp(block["last_seen"]).strftime(format)
+ found = found + 1
return templates.TemplateResponse("views/top.html", {
"request" : request,
"amount" : amount if response is not None else None,
"found" : found,
"blocklist": blocklist,
- "info" : info,
+ "slogan" : config.get("slogan"),
"theme" : config.get("theme"),
})
logger.debug("EXIT!")
-def is_valid_level(block_level: str) -> bool:
- logger.debug("block_level='%s' - CALLED!", block_level)
- if not isinstance(block_level, str):
- raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
- elif block_level == "":
- raise ValueError("Parameter 'block_level' is empty")
+def valid(value: str, column: str) -> bool:
+ logger.debug("value='%s' - CALLED!", value)
+ if not isinstance(value, str):
+ raise ValueError(f"Parameter value[]='{type(value)}' is not of type 'str'")
+ elif value == "":
+ raise ValueError("Parameter 'value' is empty")
+ elif not isinstance(column, str):
+ raise columnError(f"Parameter column[]='{type(column)}' is not of type 'str'")
+ elif column == "":
+ raise columnError("Parameter 'column' is empty")
# Query database
database.cursor.execute(
- "SELECT block_level FROM blocks WHERE block_level = ? LIMIT 1", [block_level]
+ f"SELECT {column} FROM blocks WHERE {column} = ? LIMIT 1", [value]
)
valid = database.cursor.fetchone() is not None
if not isinstance(column, str):
raise ValueError(f"Parameter column[]='{type(column)}' is not of type 'str'")
- elif column not in ["last_instance_fetch", "last_blocked"]:
+ elif column not in ["last_instance_fetch", "last_blocked", "last_nodeinfo"]:
raise ValueError(f"Parameter column='{column}' is not expected")
elif not is_registered(domain):
logger.debug("domain='%s' is not registered, returning False - EXIT!", domain)
# Set timestamp
_set_data("software", domain, software)
logger.debug("EXIT!")
+
+def valid(value: str, column: str) -> bool:
+ logger.debug("value='%s' - CALLED!", value)
+ if not isinstance(value, str):
+ raise ValueError(f"Parameter value[]='{type(value)}' is not of type 'str'")
+ elif value == "":
+ raise ValueError("Parameter 'value' is empty")
+ elif not isinstance(column, str):
+ raise columnError(f"Parameter column[]='{type(column)}' is not of type 'str'")
+ elif column == "":
+ raise columnError("Parameter 'column' is empty")
+
+ # Query database
+ database.cursor.execute(
+ f"SELECT {column} FROM instances WHERE {column} = ? LIMIT 1", [value]
+ )
+
+ valid = database.cursor.fetchone() is not None
+
+ logger.debug("valid='%s' - EXIT!", valid)
+ return valid
--- /dev/null
+{% extends "base.html" %}
+
+{% block title %}{% if mode == 'detection_mode' %} - Detection mode {{value}}{% endif %}{% endblock %}
+
+{% block header %}
+ {% if mode == 'detection_mode' %}
+ <h1>Instances detected by method {{value}}</h1>
+ {% endif %}
+{% endblock %}
+
+{% block content %}
+ {% if amount == found %}
+ <div class="notice">
+ <h3>Maximum amount reached!</h3>
+ <div>
+ Please note that the maximum allowed amount is only returned, the blocker/blocked/reason might be more.
+ Paging support is not finished yet.
+ </div>
+ </div>
+ {% endif %}
+
+ <table class="table-with-rows">
+ <thead>
+ <th>Domain</th>
+ <th>Origin</th>
+ <th>Software</th>
+ <th>Command</th>
+ <th>First added</th>
+ <th>Last seen</th>
+ </thead>
+
+ <tbody>
+ {% for row in domainlist %}
+ <tr>
+ <td>
+ {% with domain=row['domain'] %}
+ {% include "widgets/links.html" %}
+ {% endwith %}
+ </td>
+ <td>
+ {% with domain=row['origin'] %}
+ {% include "widgets/links.html" %}
+ {% endwith %}
+ </td>
+ <td>{{row['software']}}</td>
+ <td><code>{{row['command']}}</code></td>
+ <td>{{row['first_seen']}}</td>
+ <td>{{row['last_seen']}}</td>
+ </tr>
+ {% endfor %}
+ </tbody>
+ </table>
+{% endblock %}
+
+{% block footer %}
+ <a href="{{base_url}}/">Index</a> /
+ {{ super() }}
+{% endblock %}