elif amount > config.get("api_limit"):
raise HTTPException(status_code=500, detail=f"amount={amount} is to big")
- domain = wildchar = punycode = reason = None
-
if mode in ("detection_mode", "software", "command"):
database.cursor.execute(
f"SELECT domain, origin, software, detection_mode, command, total_peers, total_blocks, first_seen, last_updated \
domainlist = list()
if response is not None and response.ok:
domainlist = response.json()
- format = config.get("timestamp_format")
+ tformat = config.get("timestamp_format")
for row in domainlist:
- row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(format)
- row["last_updated"] = datetime.utcfromtimestamp(row["last_updated"]).strftime(format) if isinstance(row["last_updated"], float) else None
+ row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(tformat)
+ row["last_updated"] = datetime.utcfromtimestamp(row["last_updated"]).strftime(tformat) if isinstance(row["last_updated"], float) else None
return templates.TemplateResponse("views/list.html", {
"request" : request,
if response.ok and response.status_code == 200 and len(response.text) > 0:
blocklist = response.json()
- format = config.get("timestamp_format")
+ tformat = config.get("timestamp_format")
for block_level in blocklist:
for row in blocklist[block_level]:
- row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(format)
- row["last_seen"] = datetime.utcfromtimestamp(row["last_seen"]).strftime(format) if isinstance(row["last_seen"], float) else None
+ row["first_seen"] = datetime.utcfromtimestamp(row["first_seen"]).strftime(tformat)
+ row["last_seen"] = datetime.utcfromtimestamp(row["last_seen"]).strftime(tformat) if isinstance(row["last_seen"], float) else None
found = found + 1
return templates.TemplateResponse("views/top.html", {
domain_data = response.json()
# Format timestamps
- format = config.get("timestamp_format")
+ tformat = config.get("timestamp_format")
instance = dict()
for key in domain_data.keys():
if key in ["last_nodeinfo", "last_blocked", "first_seen", "last_updated", "last_instance_fetch"] and isinstance(domain_data[key], float):
# Timestamps
- instance[key] = datetime.utcfromtimestamp(domain_data[key]).strftime(format)
+ instance[key] = datetime.utcfromtimestamp(domain_data[key]).strftime(tformat)
else:
# Generic
instance[key] = domain_data[key]
instances.set_has_obfuscation(blocker, False)
blocking = list()
- blockdict = list()
if software == "pleroma":
logger.info("blocker='%s',software='%s'", blocker, software)
blocking = pleroma.fetch_blocks(blocker, nodeinfo_url)
logger.debug("Invoking instances.set_total_blocks(%s, %d) ...", row["domain"], len(blocking))
instances.set_total_blocks(row["domain"], blocking)
- logger.info("Checking %d block(s) from domain='%s' ...", len(blocking), row["domain"])
obfuscated = 0
blockdict = list()
+
+ logger.info("Checking %d block(s) from domain='%s' ...", len(blocking), row["domain"])
for block in blocking:
logger.debug("block[blocked]='%s'", block["blocked"])
blocked = None
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if not response.ok or response.status_code >= 300 or len(response.content) == 0:
- logger.warning("Failed fetching url='%s': response.ok='%s',response.status_code=%d,response.content()=%d - EXIT!", response.ok, response.status_code, len(response.text))
+ logger.warning("Failed fetching url='%s': response.ok='%s',response.status_code=%d,response.content()=%d - EXIT!", url, response.ok, response.status_code, len(response.text))
return 1
reader = csv.DictReader(response.content.decode("utf-8").splitlines(), dialect="unix")
logger.debug("reader[]='%s'", type(reader))
- blockdict = list()
for row in reader:
logger.debug("row[]='%s'", type(row))
domain = tidyup.domain(row["hostname"])
logger.warning("domain='%s' is not wanted - SKIPPED!", domain)
continue
elif (args.all is None or not args.all) and instances.is_registered(domain):
- logger.debug("domain='%s' is already registered, --all not specified: args.all[]='%s'", type(args.all))
+ logger.debug("domain='%s' is already registered, --all not specified: args.all[]='%s'", domain, type(args.all))
continue
elif instances.is_recent(domain):
logger.debug("domain='%s' has been recently crawled - SKIPPED!", domain)
raise_on(domain)
if not isinstance(url, str):
- raise ValueError(f"Parameter url[]='%s' is not of type 'str'", type(url))
+ raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
elif url == "":
raise ValueError("Parameter 'url' is empty")
import logging
+from fba import utils
+
from fba.helpers import domain as domain_helper
from fba.http import federation
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
-def domain(domain: str, blocker: str, command: str) -> bool:
- logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
- domain_helper.raise_on(domain)
+def domain(name: str, blocker: str, command: str) -> bool:
+ logger.debug("name='%s',blocker='%s',command='%s' - CALLED!", name, blocker, command)
+ domain_helper.raise_on(name)
domain_helper.raise_on(blocker)
if not isinstance(command, str):
elif command == "":
raise ValueError("Parameter 'command' is empty")
- logger.debug("domain='%s' - BEFORE!", domain)
- domain = deobfuscate(domain, blocker)
+ logger.debug("name='%s' - BEFORE!", name)
+ name = utils.deobfuscate(name, blocker)
- logger.debug("domain='%s' - DEOBFUSCATED!", domain)
+ logger.debug("name='%s' - DEOBFUSCATED!", name)
if instances.has_pending(blocker):
logger.debug("Flushing updates for blocker='%s' ...", blocker)
instances.update_data(blocker)
- if not is_domain_wanted(domain):
- logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
+ if not utils.is_domain_wanted(name):
+ logger.debug("name='%s' is not wanted - SKIPPED!", name)
return False
- elif instances.is_recent(domain):
- logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
+ elif instances.is_recent(name):
+ logger.debug("name='%s' has been recently checked - SKIPPED!", name)
return False
processed = False
try:
- logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
- federation.fetch_instances(domain, blocker, None, command)
+ logger.info("Fetching instances for name='%s',blocker='%s',command='%s' ...", name, blocker, command)
+ federation.fetch_instances(name, blocker, None, command)
processed = True
except network.exceptions as exception:
- logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
- instances.set_last_error(domain, exception)
+ logger.warning("Exception '%s' during fetching instances (%s) from name='%s'", type(exception), command, name)
+ instances.set_last_error(name, exception)
- logger.debug("Checking if domain='%s' has pending updates ...", domain)
- if instances.has_pending(domain):
- logger.debug("Flushing updates for domain='%s' ...", domain)
- instances.update_data(domain)
+ logger.debug("Checking if name='%s' has pending updates ...", name)
+ if instances.has_pending(name):
+ logger.debug("Flushing updates for name='%s' ...", name)
+ instances.update_data(name)
logger.debug("processed='%s' - EXIT!", processed)
return processed
from urllib.parse import urlparse
import bs4
+import requests
import validators
from fba import csrf
elif value == "":
raise ValueError("Parameter 'value' is empty")
elif not isinstance(column, str):
- raise columnError(f"Parameter column[]='{type(column)}' is not of type 'str'")
+ raise ValueError(f"Parameter column[]='{type(column)}' is not of type 'str'")
elif column == "":
- raise columnError("Parameter 'column' is empty")
+ raise ValueError("Parameter 'column' is empty")
# Query database
database.cursor.execute(
f"SELECT {column} FROM blocks WHERE {column} = ? LIMIT 1", [value]
)
- valid = database.cursor.fetchone() is not None
+ is_valid = database.cursor.fetchone() is not None
- logger.debug("valid='%s' - EXIT!", valid)
- return valid
+ logger.debug("is_valid='%s' - EXIT!", is_valid)
+ return is_valid
def translate_idnas(rows: list, column: str):
logger.debug("rows[]='%s' - CALLED!", type(rows))
if not isinstance(rows, list):
- raise ValueError(f"rows[]='%s' is not of type 'list'", type(rows))
+ raise ValueError(f"rows[]='{type(rows)}' is not of type 'list'")
elif len(rows) == 0:
raise ValueError("Parameter 'rows' is an empty list")
elif not isinstance(column, str):
- raise ValueError(f"column='%s' is not of type 'str'", type(column))
+ raise ValueError(f"column='{type(column)}' is not of type 'str'")
elif column == "":
raise ValueError("Parameter 'column' is empty")
elif column not in ["blocker", "blocked"]:
def alias_block_level(block_level: str) -> str:
logger.debug("block_level='%s' - CALLED!", block_level)
if not isinstance(block_level, str):
- raise ValueError(f"Parameter block_level[]='%s' is not of type 'str'", type(block_level))
+ raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
elif block_level == "":
raise ValueError("Parameter 'block_level' is empty")
def is_registered(domain: str, skip_raise = False) -> bool:
logger.debug("domain='%s',skip_raise='%s' - CALLED!", domain, skip_raise)
if not isinstance(skip_raise, bool):
- raise ValueError(f"skip_raise[]='%s' is not type of 'bool'", type(skip_raise))
+ raise ValueError(f"skip_raise[]='{type(skip_raise)}' is not type of 'bool'")
if not skip_raise:
domain_helper.raise_on(domain)
elif not char in domain:
raise ValueError(f"char='{char}' not found in domain='{domain}' but function invoked")
elif not isinstance(domain, str):
- raise ValueError(f"Parameter domain[]='%s'", type(domain))
+ raise ValueError(f"Parameter domain[]='{type(domain)}'")
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
elif not isinstance(blocked_hash, str) and blocked_hash is not None:
logger.debug("domain='%s' - AFTER!", domain)
if domain == "":
- debug.warning("domain is empty after tidyup - EXIT!")
+ logger.warning("domain is empty after tidyup - EXIT!")
return None
search = domain.replace(char, "_")
elif value == "":
raise ValueError("Parameter 'value' is empty")
elif not isinstance(column, str):
- raise columnError(f"Parameter column[]='{type(column)}' is not of type 'str'")
+ raise ValueError(f"Parameter column[]='{type(column)}' is not of type 'str'")
elif column == "":
- raise columnError("Parameter 'column' is empty")
+ raise ValueError("Parameter 'column' is empty")
# Query database
database.cursor.execute(
f"SELECT {column} FROM instances WHERE {column} = ? LIMIT 1", [value]
)
- valid = database.cursor.fetchone() is not None
+ is_valid = database.cursor.fetchone() is not None
- logger.debug("valid='%s' - EXIT!", valid)
- return valid
+ logger.debug("is_valid='%s' - EXIT!", is_valid)
+ return is_valid
def translate_idnas(rows: list, column: str):
logger.debug("rows[]='%s' - CALLED!", type(rows))
if not isinstance(rows, list):
- raise ValueError(f"rows[]='%s' is not of type 'list'", type(rows))
+ raise ValueError("rows[]='{type(rows)}' is not of type 'list'")
elif len(rows) == 0:
raise ValueError("Parameter 'rows' is an empty list")
elif not isinstance(column, str):
- raise ValueError(f"column='%s' is not of type 'str'", type(column))
+ raise ValueError(f"column='{type(column)}' is not of type 'str'")
elif column == "":
raise ValueError("Parameter 'column' is empty")
elif column not in ["domain", "origin"]:
logger.debug("source_domain='%s' - CALLED!", source_domain)
domain_helper.raise_on(source_domain)
- is_recent = False
+ recent = False
row = fetch(source_domain)
logger.debug("row[]='%s'", type(row))
if row is not None:
logger.debug("source_domain='%s',row[last_accessed]=%d", source_domain, row["last_accessed"])
- is_recent = (time.time() - row["last_accessed"]) <= config.get("source_last_access")
+ recent = (time.time() - row["last_accessed"]) <= config.get("source_last_access")
- logger.debug("is_recent='%s' - EXIT!", is_recent)
- return is_recent
+ logger.debug("recent='%s' - EXIT!", recent)
+ return recent
def update (source_domain: str):
logger.debug("source_domain='%s' - CALLED!", source_domain)
logger.debug("doc[]='%s',only='%s' - CALLED!")
if not isinstance(doc, bs4.BeautifulSoup):
raise ValueError(f"Parameter doc[]='{type(only)}' is not of type 'bs4.BeautifulSoup'")
- elif not isinstance(only, str) and only != None:
+ elif not isinstance(only, str) and only is not None:
raise ValueError(f"Parameter only[]='{type(only)}' is not of type 'str'")
elif isinstance(only, str) and only == "":
raise ValueError("Parameter 'only' is empty")