else:
raise HTTPException(status_code=400, detail="No filter specified")
- scores = list()
+ scores = []
for row in database.cursor.fetchall():
scores.append({
@router.get(config.get("base_url") + "/api/v1/instance/domain_blocks", response_class=JSONResponse)
def api_domain_blocks(request: Request) -> None:
blocked = blacklist.get_all()
- blocking = list()
+ blocking = []
for block in blocked:
blocking.append({
def api_peers(request: Request) -> None:
database.cursor.execute("SELECT domain FROM instances WHERE nodeinfo_url IS NOT NULL")
- peers = list()
+ peers = []
for row in database.cursor.fetchall():
peers.append(row["domain"])
timeout=config.timeout
)
- domainlist = list()
+ domainlist = []
if response is not None and response.ok:
domainlist = response.json()
tformat = config.get("timestamp_format")
)
found = 0
- blocklist = list()
+ blocklist = []
if response.ok and response.status_code == 200 and len(response.text) > 0:
blocklist = response.json()
# Format timestamps
tformat = config.get("timestamp_format")
- instance = dict()
+ instance = {}
for key in domain_data.keys():
if key in ["last_nodeinfo", "last_blocked", "first_seen", "last_updated", "last_instance_fetch"] and isinstance(domain_data[key], float):
# Timestamps
try:
logger.debug("Checking CSRF from source_domain='%s' ...", source_domain)
- headers = csrf.determine(source_domain, dict())
+ headers = csrf.determine(source_domain, {})
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s) - EXIT!", type(exception), __name__)
- return list()
+ return []
try:
logger.info("Fetching JSON from pixelfed.org API, headers()=%d ...", len(headers))
logger.debug("source_domain='%s' has not been recently used, marking ...", source_domain)
sources.update(source_domain)
- domains = list()
+ domains = []
try:
logger.info("Fetching domainlist from source_domain='%s' ...", source_domain)
fetched = network.post_json_api(
logger.debug("Invoking instances.set_total_blocks(%s, %d) ...", row["domain"], len(blocking))
instances.set_total_blocks(row["domain"], blocking)
- blockdict = list()
+ blockdict = []
deobfuscated = obfuscated = 0
logger.info("Checking %d entries from row[domain]='%s',row[software]='%s' ...", len(blocking), row["domain"], row["software"])
logger.debug("blocked='%s',block_level='%s',reason='%s'", block["blocked"], block["block_level"], block["reason"])
if block["block_level"] in [None, ""]:
- logger.warning("block_level='%s' is empty, row[domain]='%s',block[blocked]='%s'", block_level, block["blocker"], block["blocked"])
+ logger.warning("block[block_level]='%s' is empty, row[domain]='%s',block[blocked]='%s'", block["block_level"], block["blocker"], block["blocked"])
continue
logger.debug("block[blocked]='%s',block[reason]='%s' - BEFORE!", block["blocked"], block["reason"])
logger.debug("source_domain='%s' has not been recently used, marking ...", source_domain)
sources.update(source_domain)
- types = list()
+ types = []
if args.software is None:
logger.info("Fetching software list ...")
raw = network.fetch_url(
logger.info("Checking %d menu items ...", len(items))
for item in items:
- logger.debug("item[%s]='%s'", type(item), item)
+ logger.debug("item[%s]='%s' - BEFORE!", type(item), item)
domain = item.text.lower()
domain = tidyup.domain(domain) if domain is not None and len(domain) > 0 else None
+ logger.debug("domain='%s' - AFTER!", domain)
- logger.debug("domain='%s'", domain)
if domain is None:
- logger.debug("Skipping empty domain in tag='%s' - SKIPPED!", tag)
+ logger.debug("Skipping empty domain in item='%s' - SKIPPED!", item)
continue
elif domain == "all":
- logger.debug("Skipping 'All' menu entry in tag='%s' - SKIPPED!", tag)
+ logger.debug("Skipping 'All' menu entry in item='%s' - SKIPPED!", item)
continue
logger.debug("Appending domain='%s' ...", domain)
logger.debug("args.software='%s' does not match software='%s' - SKIPPED!", args.software, software)
continue
- items = list()
+ items = []
try:
logger.debug("Fetching table data for software='%s' ...", software)
raw = network.post_json_api(
sources.update(source_domain)
blocklist = {
- "silenced": list(),
- "rejected": list(),
+ "silenced": [],
+ "rejected": [],
}
logger.debug("Fetching domainblocks from source_domain='%s'", source_domain)
instances.set_last_blocked(blocker)
instances.set_total_blocks(blocker, blocking)
- blockdict = list()
+ blockdict = []
for block_level in blocklist:
logger.debug("block_level='%s'", block_level)
blockers = blocklist[block_level]
]
blocklist = {
- "silenced": list(),
- "rejected": list(),
+ "silenced": [],
+ "rejected": [],
}
source_domain = "meta.chaos.social"
logger.debug("blocklist[silenced]()=%d,blocklist[reject]()=%d", len(blocklist["silenced"]), len(blocklist["rejected"]))
if len(blocking) > 0:
- blockdict = list()
+ blockdict = []
for block_level in blocklist:
logger.info("block_level='%s' has %d row(s)", block_level, len(blocklist[block_level]))
def fetch_fba_rss(args: argparse.Namespace) -> int:
logger.debug("args[]='%s' - CALLED!", type(args))
- domains = list()
+ domains = []
logger.debug("Invoking locking.acquire() ...")
locking.acquire()
logger.debug("source_domain='%s' has not been recently used, marking ...", source_domain)
sources.update(source_domain)
- domains = list()
+ domains = []
logger.info("Fetching ATOM feed='%s' from FBA bot account ...", feed)
response = network.fetch_url(feed, network.web_headers, config.timeout)
locking.acquire()
# Init variables
- rows = list()
+ rows = []
# Is domain or software set?
if args.domain not in [None, ""]:
instances.set_total_blocks(row["domain"], blocking)
obfuscated = 0
- blockdict = list()
+ blockdict = []
logger.info("Checking %d block(s) from row[domain]='%s' ...", len(blocking), row["domain"])
for block in blocking:
locking.acquire()
source_domain = "instances.social"
- domains = list()
+ domains = []
if config.get("instances_social_api_key") == "":
logger.error("API key not set. Please set in your config.json file.")
"/relays"
)
- domains = list()
+ domains = []
logger.info("Checking %d row(s) ...", len(rows))
for row in rows:
logger.debug("Fetch all relay instances ...")
database.cursor.execute("SELECT domain, software, nodeinfo_url FROM instances WHERE software IN ('activityrelay', 'aoderelay', 'selective-relay', 'pub-relay') AND nodeinfo_url IS NOT NULL ORDER BY last_updated DESC")
- domains = list()
+ domains = []
rows = database.cursor.fetchall()
logger.info("Checking %d relays ...", len(rows))
logger.warning("row[domain]='%s' has empty nodeinfo_url but this is required - SKIPPED!", row["domain"])
continue
- peers = list()
+ peers = []
try:
logger.debug("row[domain]='%s',row[software]='%s' - checking ....", row["domain"], row["software"])
if row["software"] == "pub-relay":
raise ValueError(f"Parameter key[]='{type(key)}' is not of type 'str'")
elif not key_exists(key):
logger.debug("Cache for key='%s' not initialized.", key)
- _cache[key] = dict()
+ _cache[key] = {}
logger.debug("Setting %d row(s) for key='%s',value[%s]='%s' ...", len(rows), key, type(value), value)
for sub in rows:
if blacklist.is_blacklisted(domain):
raise ValueError(f"domain='{domain}' is blacklisted but function was invoked")
elif domain not in _cookies:
- return dict()
+ return []
logger.debug("_cookies[%s]()=%d - EXIT!", domain, len(_cookies[domain]))
return _cookies[domain]
elif response.text.strip() != "" and not is_json_response(response):
logger.warning("response.headers[content-type]='%s' is not a JSON type, below json() invocation may raise an exception", response.headers.get("content-type"))
- data = list()
+ data = []
raw = response.text.strip()
logger.debug("raw()=%d", len(raw))
logger.debug("Setting last_blocked for blocker='%s' ...", blocker)
instances.set_last_blocked(blocker)
- domains = list()
+ domains = []
# Fetch this URL
logger.info("Fetching url='%s' for blocker='%s' ...", url, blocker)
rows = list(reader)
# Init local variables
- blockdict = list()
+ blockdict = []
cnt = 0
logger.info("Checking %d CSV lines ...", len(rows))
logger.debug("Updating last_instance_fetch for domain='%s' ...", domain)
instances.set_last_instance_fetch(domain)
- peerlist = list()
+ peerlist = []
logger.debug("software='%s'", software)
if software is not None:
try:
try:
logger.debug("Checking CSRF for domain='%s'", domain)
- headers = csrf.determine(domain, dict())
+ headers = csrf.determine(domain, {})
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s)", type(exception), __name__)
instances.set_last_error(domain, exception)
logger.debug("Returning empty list ... - EXIT!")
- return list()
+ return []
# Init peers variable
- peers = list()
+ peers = []
logger.debug("Checking %d API paths ...", len(_api_paths))
for path in _api_paths:
if not isinstance(peers, list):
logger.warning("peers[]='%s' is not of type 'list', maybe bad API response?", type(peers))
- peers = list()
+ peers = []
logger.debug("Invoking instances.set_total_peers(%s,%d) ...", domain, len(peers))
instances.set_total_peers(domain, peers)
logger.debug("software[%s]='%s' - EXIT!", type(software), software)
return software
-def find_domains(tag: bs4.element.Tag, domainColumn: str = "dt", reasonColumn: str = "dd", reasonText: str = "Categories:") -> list:
- logger.debug("tag[]='%s',domainColumn='%s',reasonColumn='%s',reasonText='%s' - CALLED!", type(tag), domainColumn, reasonColumn, reasonText)
+def find_domains(tag: bs4.element.Tag, domain_column: str = "dt", reason_column: str = "dd", reason_text: str = "Categories:") -> list:
+ logger.debug("tag[]='%s',domain_column='%s',reason_column='%s',reason_text='%s' - CALLED!", type(tag), domain_column, reason_column, reason_text)
if not isinstance(tag, bs4.element.Tag):
raise ValueError(f"Parameter tag[]='{type(tag)}' is not type of bs4.element.Tag")
- elif not isinstance(domainColumn, str):
- raise ValueError(f"Parameter domainColumn[]='{type(domainColumn)}' is not type of 'str'")
- elif domainColumn == "":
- raise ValueError("Parameter 'domainColumn' is an empty string")
- elif not isinstance(reasonColumn, str):
- raise ValueError(f"Parameter reasonColumn[]='{type(reasonColumn)}' is not type of 'str'")
- elif reasonColumn == "":
- raise ValueError("Parameter 'reasonColumn' is an empty string")
- elif len(tag.find_all(domainColumn)) == 0:
- raise KeyError("No domainColumn='{domainColumn}' rows found in table!")
- elif len(tag.find_all(reasonColumn)) == 0:
- raise KeyError("No reasonColumn='{reasonColumn}' rows found in table!")
- elif not isinstance(reasonText, str):
- raise ValueError(f"Parameter reasonText[]='{type(reasonText)}' is not type of 'str'")
- elif reasonText == "":
- raise ValueError("Parameter 'reasonText' is an empty string")
-
- domains = list()
- for element in tag.find_all(domainColumn):
+ elif not isinstance(domain_column, str):
+ raise ValueError(f"Parameter domain_column[]='{type(domain_column)}' is not type of 'str'")
+ elif domain_column == "":
+ raise ValueError("Parameter 'domain_column' is an empty string")
+ elif not isinstance(reason_column, str):
+ raise ValueError(f"Parameter reason_column[]='{type(reason_column)}' is not type of 'str'")
+ elif reason_column == "":
+ raise ValueError("Parameter 'reason_column' is an empty string")
+ elif len(tag.find_all(domain_column)) == 0:
+ raise KeyError("No domain_column='{domain_column}' rows found in table!")
+ elif len(tag.find_all(reason_column)) == 0:
+ raise KeyError("No reason_column='{reason_column}' rows found in table!")
+ elif not isinstance(reason_text, str):
+ raise ValueError(f"Parameter reason_text[]='{type(reason_text)}' is not type of 'str'")
+ elif reason_text == "":
+ raise ValueError("Parameter 'reason_text' is an empty string")
+
+ domains = []
+ for element in tag.find_all(domain_column):
logger.debug("element[%s]='%s'", type(element), element)
domain = tidyup.domain(element.text)
- reasons = element.find_next(reasonColumn).text.split(reasonText)[1].splitlines()
+ reasons = element.find_next(reason_column).text.split(reason_text)[1].splitlines()
+ logger.debug("domain='%s',reasons(%d)='%s'", domain, len(reasons), reasons)
- logger.debug("reasons(%d)='%s'", len(reasons), reasons)
reason = ""
for r in reasons:
logger.debug("r[%s]='%s'", type(r), r)
raise ValueError("Parameter 'rows' is empty")
# Init variables
- peers = list()
+ peers = []
for key in ["linked", "allowed", "blocked"]:
logger.debug("key='%s'", key)
raise Exception(f"domain='{domain}' is not registered but function is invoked")
# Init block list
- blocklist = list()
+ blocklist = []
# No CSRF by default, you don't have to add network.api_headers by yourself here
- headers = dict()
+ headers = {}
try:
logger.debug("Checking CSRF for domain='%s'", domain)
- headers = csrf.determine(domain, dict())
+ headers = csrf.determine(domain, {})
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (fetch_blocks,%s)", type(exception), __name__)
instances.set_last_error(domain, exception)
logger.debug("Returning empty list ... - EXIT!")
- return list()
+ return []
try:
# json endpoint for newer mastodongs
logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)
-def post_json_api(domain: str, path: str, data: str = "", headers: dict = dict()) -> dict:
+def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) -> dict:
logger.debug("domain='%s',path='%s',data='%s',headers()=%d - CALLED!", domain, path, data, len(headers))
domain_helper.raise_on(domain)
response = fetch_response(
components.netloc.split(":")[0],
components.path if isinstance(components.path, str) and components.path != '' else '/',
- headers,
- timeout
+ headers=headers,
+ timeout=timeout
)
logger.debug("response[]='%s' - EXIT!", type(response))
if "error_message" in fetched:
logger.warning("Error during fetching API result: '%s' - EXIT!", fetched["error_message"])
- return list()
+ return []
elif "exception" in fetched:
logger.warning("Exception '%s' during fetching API result - EXIT!", type(fetched["exception"]))
- return list()
+ return []
elif "json" not in fetched:
raise KeyError("fetched has no element 'json'")
elif rows_key not in[None, ""] and rows_key not in fetched["json"]:
raise KeyError(f"fetched[row] has no element '{rows_key}'")
- elif rows_key == None:
+ elif rows_key is None:
logger.debug("Parameter 'rows_key' is not set, using whole fetched['json'] as rows ...")
rows = fetched["json"]
else:
# No CSRF by default, you don't have to add network.api_headers by yourself here
headers = tuple()
- data = dict()
+ data = {}
try:
logger.debug("Checking CSRF for domain='%s'", domain)
- headers = csrf.determine(domain, dict())
+ headers = csrf.determine(domain, {})
logger.debug("headers()=%d", len(headers))
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (nodeinfo,%s) - EXIT!", type(exception), __name__)
try:
logger.debug("Checking CSRF for domain='%s'", domain)
- headers = csrf.determine(domain, dict())
+ headers = csrf.determine(domain, {})
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (fetch_wellknown_nodeinfo,%s) - EXIT!", type(exception), __name__)
instances.set_last_error(domain, exception)
"exception" : exception,
}
- data = dict()
+ data = {}
logger.debug("Fetching .well-known info for domain='%s'", domain)
for path in _well_known_nodeinfo_urls:
return data
elif "json" not in data:
logger.warning("domain='%s' returned no 'json' key - EXIT!", domain)
- return dict()
+ return []
infos = data["json"]
logger.debug("infos(%d)[]='%s' has been returned", len(infos), type(infos))
logger.debug("Checking %d nodeinfo ids ...", len(_nodeinfo_identifier))
for niid in _nodeinfo_identifier:
- data = dict()
+ data = {}
logger.debug("Checking niid='%s' for infos[links]()=%d ...", niid, len(infos["links"]))
for link in infos["links"]:
raise Exception(f"domain='{domain}' is blacklisted but function has been invoked")
sql_string = ""
- fields = list()
+ fields = []
logger.debug("Checking %d _pending array elements ...", len(_pending))
for key in _pending:
elif not instances.is_registered(domain):
raise Exception(f"domain='{domain}' is not registered but function is invoked")
- blocklist = list()
+ blocklist = []
block_tag = None
try:
instances.set_last_error(domain, exception)
logger.debug("Returning empty list ... - EXIT!")
- return list()
+ return []
logger.debug("block_tag[%s]='%s'", type(block_tag), block_tag)
if block_tag is None:
logger.debug("Instance has no block list: domain='%s' - EXIT!", domain)
- return list()
+ return []
table = block_tag.find("table")
logger.debug("table[]='%s'", type(table))
if table is None:
logger.warning("domain='%s' has no table tag - EXIT !", domain)
- return list()
+ return []
elif table.find("tbody"):
rows = table.find("tbody").find_all("tr")
else:
elif not instances.is_registered(domain):
raise Exception(f"domain='{domain}' is not registered but function is invoked")
- peers = list()
+ peers = []
# No CSRF by default, you don't have to add network.api_headers by yourself here
headers = tuple()
try:
logger.debug("Checking CSRF for domain='%s' ...", domain)
- headers = csrf.determine(domain, dict())
+ headers = csrf.determine(domain, {})
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s)", type(exception), __name__)
instances.set_last_error(domain, exception)
logger.debug("Returning empty list ... - EXIT!")
- return list()
+ return []
try:
logger.debug("Fetching '/api/v3/site' from domain='%s' ...", domain)
elif not instances.is_registered(domain):
raise Exception(f"domain='{domain}' is not registered but function is invoked")
- blocklist = list()
+ blocklist = []
try:
# json endpoint for newer mastodongs
if blacklist.is_blacklisted(domain):
raise Exception(f"domain='{domain}' is blacklisted but function is invoked")
- peers = list()
+ peers = []
try:
# json endpoint for newer mastodongs
raise ValueError("Parameter 'only' is empty")
scripts = doc.find_all("script")
- peers = list()
+ peers = []
logger.debug("scripts()=%d", len(scripts))
for script in scripts:
parsed = json.loads(iso_data)
except json.decoder.JSONDecodeError as exception:
logger.warning("Exception '%s' during parsing %d Bytes: '%s' - EXIT!", type(exception), len(iso_data), str(exception))
- return list()
+ return []
logger.debug("parsed[%s]()=%d", type(parsed), len(parsed))
logger.debug("doc[]='%s'", type(doc))
if doc is None:
logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
- return list()
+ return []
for header in doc.find_all("h3"):
header_text = tidyup.reason(header.text)
raise Exception(f"domain='{domain}' is not registered but function is invoked")
# Init variables
- blocklist = list()
+ blocklist = []
logger.debug("Invoking fetch_blocks_from_about(%s) ...", domain)
rows = fetch_blocks_from_about(domain)
raise Exception(f"domain='{domain}' is not registered but function is invoked")
logger.debug("domain='%s' is misskey, sending API POST request ...", domain)
- peers = list()
+ peers = []
offset = 0
step = config.get("misskey_limit")
try:
logger.debug("Checking CSRF for domain='%s'", domain)
- headers = csrf.determine(domain, dict())
+ headers = csrf.determine(domain, {})
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s)", type(exception), __name__)
instances.set_last_error(domain, exception)
logger.debug("Returning empty list ... - EXIT!")
- return list()
+ return []
# iterating through all "suspended" (follow-only in its terminology)
# instances page-by-page, since that troonware doesn't support
try:
logger.debug("Checking CSRF for domain='%s' ...", domain)
- headers = csrf.determine(domain, dict())
+ headers = csrf.determine(domain, {})
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (fetch_blocks,%s)", type(exception), __name__)
instances.set_last_error(domain, exception)
logger.debug("Returning empty list ... - EXIT!")
- return list()
+ return []
- blocklist = list()
+ blocklist = []
offset = 0
step = config.get("misskey_limit")
raise Exception(f"domain='{domain}' is not registered but function is invoked")
# Init variables
- peers = list()
+ peers = []
headers = tuple()
start = 0
try:
logger.debug("Checking CSRF for domain='%s' ...", domain)
- headers = csrf.determine(domain, dict())
+ headers = csrf.determine(domain, {})
except network.exceptions as exception:
logger.warning("Exception '%s' during checking CSRF (fetch_peers,%s)", type(exception), __name__)
instances.set_last_error(domain, exception)
logger.debug("Returning empty list ... - EXIT!")
- return list()
+ return []
for mode in ["followers", "following"]:
logger.debug("domain='%s',mode='%s'", domain, mode)
raise Exception(f"domain='{domain}' is not registered but function is invoked")
# Init variables
- blockdict = list()
+ blockdict = []
rows = None
try:
instances.update(domain)
logger.debug("Returning empty list ... - EXIT!")
- return list()
+ return []
elif "exception" in rows:
logger.warning("Exception '%s' during fetching nodeinfo for domain='%s' - EXIT!", type(rows["exception"]), domain)
- return list()
+ return []
elif "json" in rows:
logger.debug("rows[json] found for domain='%s'", domain)
rows = rows["json"]
logger.debug("rows[]='%s'", type(rows))
if rows is None:
logger.warning("Could not fetch nodeinfo from domain='%s' - EXIT!", domain)
- return list()
+ return []
elif "metadata" not in rows:
logger.warning("rows()=%d does not have key 'metadata', domain='%s' - EXIT!", len(rows), domain)
- return list()
+ return []
elif "federation" not in rows["metadata"]:
logger.warning("rows()=%d does not have key 'federation', domain='%s' - EXIT!", len(rows["metadata"]), domain)
- return list()
+ return []
found = False
data = rows["metadata"]["federation"]
logger.debug("doc[]='%s'", type(doc))
if doc is None:
logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
- return list()
+ return []
headers = doc.find_all("h2")
logger.debug("headers[]='%s'", type(headers))
if headers is None:
logger.warning("Cannot fetch any /about pages for domain='%s' - EXIT!", domain)
- return list()
+ return []
logger.info("Checking %d headers ...", len(headers))
for header in headers:
elif search == "":
raise ValueError("Parameter 'search' is empty")
- domains = list()
+ domains = []
logger.debug("Parsing %d tags ...", len(tags))
for tag in tags:
logger.debug("tag[]='%s'", type(tag))