logger.debug("blocker='%s',software='%s'", blocker, software)
if software == "pleroma":
logger.info("blocker='%s',software='%s'", blocker, software)
- blocking = pleroma.fetch_blocks(blocker, nodeinfo_url)
+ blocking = pleroma.fetch_blocks(blocker)
logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
elif software == "mastodon":
logger.info("blocker='%s',software='%s'", blocker, software)
- blocking = mastodon.fetch_blocks(blocker, nodeinfo_url)
+ blocking = mastodon.fetch_blocks(blocker)
logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
elif software == "lemmy":
logger.info("blocker='%s',software='%s'", blocker, software)
- blocking = lemmy.fetch_blocks(blocker, nodeinfo_url)
+ blocking = lemmy.fetch_blocks(blocker)
logger.debug("blocker='%s' returned %d entries,software='%s'", blocker, len(blocking), software)
elif software == "friendica":
logger.info("blocker='%s',software='%s'", blocker, software)
logger.debug("Invoking instances.set_total_blocks(%s, %d) ...", blocker, len(blocking))
instances.set_total_blocks(blocker, blocking)
- logger.info("Checking %d entries from blocker='%s',software='%s' ...", len(blocking), blocker, software)
blockdict = list()
+
+ logger.info("Checking %d entries from blocker='%s',software='%s' ...", len(blocking), blocker, software)
for block in blocking:
logger.debug("blocked='%s',block_level='%s',reason='%s'", block["blocked"], block["block_level"], block["reason"])
if len(blocking) == 0:
if row["software"] == "pleroma":
logger.debug("domain='%s',software='%s'", row["domain"], row["software"])
- blocking = pleroma.fetch_blocks(row["domain"], row["nodeinfo_url"])
+ blocking = pleroma.fetch_blocks(row["domain"])
elif row["software"] == "mastodon":
logger.debug("domain='%s',software='%s'", row["domain"], row["software"])
- blocking = mastodon.fetch_blocks(row["domain"], row["nodeinfo_url"])
+ blocking = mastodon.fetch_blocks(row["domain"])
elif row["software"] == "lemmy":
logger.debug("domain='%s',software='%s'", row["domain"], row["software"])
- blocking = lemmy.fetch_blocks(row["domain"], row["nodeinfo_url"])
+ blocking = lemmy.fetch_blocks(row["domain"])
elif row["software"] == "friendica":
logger.debug("domain='%s',software='%s'", row["domain"], row["software"])
blocking = friendica.fetch_blocks(row["domain"])
logger.debug("software='%s' - EXIT!", software)
return software
-def determine_software(domain: str, path: str = None, nodeinfo_url: str = None) -> str:
- logger.debug("domain='%s',path='%s',nodeinfo_url='%s' - CALLED!", domain, path, nodeinfo_url)
+def determine_software(domain: str, path: str = None) -> str:
+ logger.debug("domain='%s',path='%s' - CALLED!", domain, path)
domain_helper.raise_on(domain)
if not isinstance(path, str) and path is not None:
raise ValueError(f"Parameter path[]='{type(path)}' is not of type 'str'")
- elif not isinstance(nodeinfo_url, str) and nodeinfo_url is not None:
- raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not of type 'str'")
- logger.debug("Fetching nodeinfo from domain='%s',path='%s',nodeinfo_url='%s' ...", domain, path, nodeinfo_url)
- data = nodeinfo.fetch(domain, path, nodeinfo_url)
+ logger.debug("Fetching nodeinfo from domain='%s',path='%s' ...", domain, path)
+ data = nodeinfo.fetch(domain, path)
software = None
logger.debug("data[%s]='%s'", type(data), data)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
-def fetch(domain: str, path: str = None, nodeinfo_url: str = None) -> dict:
- logger.debug("domain='%s',path='%s',nodeinfo_url='%s' - CALLED!", domain, path, nodeinfo_url)
+def fetch(domain: str, path: str = None, update_mode: bool = True) -> dict:
+ logger.debug("domain='%s',path='%s',update_mode='%s' - CALLED!", domain, path, update_mode)
domain_helper.raise_on(domain)
if not isinstance(path, str) and path is not None:
raise ValueError(f"Parameter path[]='{type(path)}' is not of type 'str'")
- elif not isinstance(nodeinfo_url, str) and nodeinfo_url is not None:
- raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not of type 'str'")
+ elif not isinstance(update_mode, bool) and update_mode is not None:
+ raise ValueError(f"Parameter update_mode[]='{type(update_mode)}' is not of type 'bool'")
- logger.debug("nodeinfo_url='%s'", nodeinfo_url)
- is_url = nodeinfo_url is not None and not nodeinfo_url.startswith("/") and validators.url(nodeinfo_url)
-
- logger.debug("is_url='%s'", is_url)
- if not is_url:
+ if path is None and update_mode:
logger.debug("Fetching well-known nodeinfo from domain='%s' ...", domain)
data = fetch_wellknown_nodeinfo(domain)
https_url = f"https://{domain}{str(path) if path is not None else '/'}"
logger.debug("path[%s]='%s',request='%s',http_url='%s',https_url='%s'", type(path), path, request, http_url, https_url)
- if (path is None and nodeinfo_url is None) or path in [request, http_url, https_url] or (is_url and nodeinfo_url.endswith(request)):
+ if path is None or path in [request, http_url, https_url]:
logger.debug("Fetching request='%s' from domain='%s' ...", request, domain)
data = network.get_json_api(
domain,
if "error_message" not in data and "json" in data:
logger.debug("Success: request='%s' - Setting detection_mode=STATIC_CHECK ...", request)
instances.set_last_nodeinfo(domain)
- instances.set_detection_mode(domain, "STATIC_CHECK")
- instances.set_nodeinfo_url(domain, "https://{domain}{request}")
+ if update_mode:
+ instances.set_detection_mode(domain, "STATIC_CHECK")
+ instances.set_nodeinfo_url(domain, "https://{domain}{request}")
break
logger.warning("Failed fetching nodeinfo from domain='%s',status_code='%s',error_message='%s'", domain, data['status_code'], data['error_message'])
logger.debug("peers()=%d - EXIT!", len(peers))
return peers
-def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
- logger.debug("domain='%s,nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
+def fetch_blocks(domain: str) -> list:
+ logger.debug("domain='%s - CALLED!", domain)
domain_helper.raise_on(domain)
- if not isinstance(nodeinfo_url, str):
- raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not of type 'str'")
- elif nodeinfo_url == "":
- raise ValueError("Parameter 'nodeinfo_url' is empty")
-
translations = [
"Blocked Instances".lower(),
"Instàncies bloquejades".lower(),
"followers_only": blocklist["Limited servers"] + blocklist["Silenced servers"],
}
-def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
- logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
+def fetch_blocks(domain: str) -> list:
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
- if not isinstance(nodeinfo_url, str):
- raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not of type 'str'")
- elif nodeinfo_url == "":
- raise ValueError("Parameter 'nodeinfo_url' is empty")
-
blocklist = list()
logger.debug("Invoking federation.fetch_blocks(%s) ...", domain)
"removal from \"the whole known network\" timeline": "federated_timeline_removal",
}
-def fetch_blocks(domain: str, nodeinfo_url: str) -> list:
- logger.debug("domain='%s',nodeinfo_url='%s' - CALLED!", domain, nodeinfo_url)
+def fetch_blocks(domain: str) -> list:
+ logger.debug("domain='%s' - CALLED!", domain)
domain_helper.raise_on(domain)
- if not isinstance(nodeinfo_url, str):
- raise ValueError(f"Parameter nodeinfo_url[]='{type(nodeinfo_url)}' is not of type 'str'")
- elif nodeinfo_url == "":
- raise ValueError("Parameter 'nodeinfo_url' is empty")
-
blockdict = list()
rows = None
try:
- logger.debug("Fetching nodeinfo: domain='%s',nodeinfo_url='%s'", domain, nodeinfo_url)
- rows = nodeinfo.fetch(domain, nodeinfo_url=nodeinfo_url)
+ logger.debug("Fetching nodeinfo: domain='%s'", domain)
+ rows = nodeinfo.fetch(domain, update_mode=False)
if "error_message" in rows:
- logger.warning("Error message '%s' during fetching nodeinfo for domain='%s',nodeinfo_url='%s'", rows["error_message"], domain, nodeinfo_url)
+ logger.warning("Error message '%s' during fetching nodeinfo for domain='%s'", rows["error_message"], domain)
instances.set_last_error(domain, rows)
instances.update(domain)
logger.debug("Returning empty list ... - EXIT!")
return list()
elif "exception" in rows:
- logger.warning("Exception '%s' during fetching nodeinfo for domain='%s',nodeinfo_url='%s' - EXIT!", type(rows["exception"]), domain, nodeinfo_url)
+ logger.warning("Exception '%s' during fetching nodeinfo for domain='%s' - EXIT!", type(rows["exception"]), domain)
return list()
elif "json" in rows:
- logger.debug("rows[json] found for domain='%s',nodeinfo_url='%s'", domain, nodeinfo_url)
+ logger.debug("rows[json] found for domain='%s'", domain)
rows = rows["json"]
except network.exceptions as exception: