# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
-from fastapi import FastAPI, Request, HTTPException, responses, Query
+from fastapi import Request, HTTPException, responses, Query
from fastapi.templating import Jinja2Templates
from datetime import datetime
from email import utils
+import fastapi
import uvicorn
import requests
-import sqlite3
import re
import fba
-app = FastAPI(docs_url=fba.config["base_url"] + "/docs", redoc_url=fba.config["base_url"] + "/redoc")
+app = fastapi.FastAPI(docs_url=fba.config["base_url"] + "/docs", redoc_url=fba.config["base_url"] + "/redoc")
templates = Jinja2Templates(directory=".")
@app.get(fba.config["base_url"] + "/api/info")
def blocked(domain: str = None, reason: str = None, reverse: str = None):
if domain == None and reason == None and reverse == None:
raise HTTPException(status_code=400, detail="No filter specified")
+
if reason != None:
reason = re.sub("(%|_)", "", reason)
if len(reason) < 3:
raise HTTPException(status_code=400, detail="Keyword is shorter than three characters")
+
if domain != None:
wildchar = "*." + ".".join(domain.split(".")[-domain.count("."):])
punycode = domain.encode('idna').decode('utf-8')
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE blocker = ? ORDER BY first_seen ASC", [reverse])
else:
fba.cursor.execute("SELECT blocker, blocked, block_level, reason, first_seen, last_seen FROM blocks WHERE reason like ? AND reason != '' ORDER BY first_seen ASC", ["%" + reason + "%"])
+
blocks = fba.cursor.fetchall()
result = {}
for blocker, blocked, block_level, reason, first_seen, last_seen in blocks:
entry = {
- "blocker" : blocker,
- "blocked" : blocked,
- "reason" : reason,
+ "blocker" : blocker,
+ "blocked" : blocked,
+ "reason" : reason,
"first_seen": first_seen,
- "last_seen" : last_seen
+ "last_seen" : last_seen
}
if block_level in result:
result[block_level].append(entry)
scores = None
if blockers != None and blockers > 0:
- res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api/top?blockers={blockers}")
+ res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?blockers={blockers}")
elif blocked != None and blocked > 0:
- res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api/top?blocked={blocked}")
+ res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?blocked={blocked}")
elif reference != None and reference > 0:
- res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api/top?reference={reference}")
+ res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?reference={reference}")
elif software != None and software > 0:
- res = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api/top?software={software}")
+ res = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/top?software={software}")
else:
raise HTTPException(status_code=400, detail="No filter specified")
blocks = None
if domain == None and reason == None and reverse == None:
- info = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api/info")
+ info = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api/info")
if not info.ok:
raise HTTPException(status_code=info.status_code, detail=info.text)
info = info.json()
elif domain != None:
- blocks = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api?domain={domain}")
+ blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api?domain={domain}")
elif reason != None:
- blocks = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api?reason={reason}")
+ blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api?reason={reason}")
elif reverse != None:
- blocks = requests.get(f"http://127.0.0.1:{fba.config['port']}{fba.config['base_url']}/api?reverse={reverse}")
+ blocks = requests.get(f"http://{fba.config['host']}:{fba.config['port']}{fba.config['base_url']}/api?reverse={reverse}")
if blocks != None:
if not blocks.ok:
"AND block_level = 'reject' " \
"LIMIT 1",
{
- "a": domains[0],
- "b": domains[1],
+ "a" : domains[0],
+ "b" : domains[1],
"aw": "*." + domains[0],
"bw": "*." + domains[1],
},
reason = "Provided reason: '" + reason + "'"
result.append({
- "blocker": blocker,
- "blocked": blocked,
+ "blocker" : blocker,
+ "blocked" : blocked,
"block_level": block_level,
- "reason": reason,
- "first_seen": first_seen
+ "reason" : reason,
+ "first_seen" : first_seen
})
timestamp = utils.format_datetime(datetime.now())
return templates.TemplateResponse("rss.xml", {
- "request": request,
+ "request" : request,
"timestamp": timestamp,
- "domain": domain,
- "blocks": result
+ "domain" : domain,
+ "blocks" : result
}, headers={
"Content-Type": "application/rss+xml"
})
if __name__ == "__main__":
- uvicorn.run("api:app", host="127.0.0.1", port=fba.config["port"], log_level=fba.config["log_level"])
+ uvicorn.run("api:app", host=fba.config["host"], port=fba.config["port"], log_level=fba.config["log_level"])