mastodon.fetch_blocks(blocker, origin, nodeinfo_url)
elif software == "friendica" or software == "misskey":
print(f"INFO: blocker='{blocker}',software='{software}'")
- try:
- if software == "friendica":
- rows = friendica.fetch_blocks(blocker)
- elif software == "misskey":
- rows = misskey.fetch_blocks(blocker)
-
- print(f"INFO: Checking {len(rows.items())} entries from blocker='{blocker}',software='{software}' ...")
- for block_level, blocklist in rows.items():
- # DEBUG: print("DEBUG: blocker,block_level,blocklist():", blocker, block_level, len(blocklist))
- block_level = fba.tidyup_domain(block_level)
- # DEBUG: print("DEBUG: AFTER-block_level:", block_level)
- if block_level == "":
- print("WARNING: block_level is empty, blocker:", blocker)
+ if software == "friendica":
+ rows = friendica.fetch_blocks(blocker)
+ elif software == "misskey":
+ rows = misskey.fetch_blocks(blocker)
+
+ print(f"INFO: Checking {len(rows.items())} entries from blocker='{blocker}',software='{software}' ...")
+ for block_level, blocklist in rows.items():
+ # DEBUG: print("DEBUG: blocker,block_level,blocklist():", blocker, block_level, len(blocklist))
+ block_level = fba.tidyup_domain(block_level)
+ # DEBUG: print("DEBUG: AFTER-block_level:", block_level)
+ if block_level == "":
+ print("WARNING: block_level is empty, blocker:", blocker)
+ continue
+
+ # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
+ for block in blocklist:
+ blocked, reason = block.values()
+ # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - BEFORE!")
+ blocked = fba.tidyup_domain(blocked)
+ reason = fba.tidyup_reason(reason) if reason is not None and reason != "" else None
+ # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
+
+ if blocked == "":
+ print("WARNING: blocked is empty:", blocker)
continue
+ elif blacklist.is_blacklisted(blocked):
+ # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
+ continue
+ elif blocked.count("*") > 0:
+ # Some friendica servers also obscure domains without hash
+ fba.cursor.execute(
+ "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
+ )
- # DEBUG: print(f"DEBUG: Checking {len(blocklist)} entries from blocker='{blocker}',software='{software}',block_level='{block_level}' ...")
- for block in blocklist:
- blocked, reason = block.values()
- # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - BEFORE!")
- blocked = fba.tidyup_domain(blocked)
- reason = fba.tidyup_reason(reason) if reason is not None and reason != "" else None
- # DEBUG: print(f"DEBUG: blocked='{blocked}',reason='{reason}' - AFTER!")
+ searchres = fba.cursor.fetchone()
- if blocked == "":
- print("WARNING: blocked is empty:", blocker)
- continue
- elif blacklist.is_blacklisted(blocked):
- # DEBUG: print(f"DEBUG: blocked='{blocked}' is blacklisted - skipping!")
- continue
- elif blocked.count("*") > 0:
- # Some friendica servers also obscure domains without hash
- fba.cursor.execute(
- "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("*", "_")]
- )
-
- searchres = fba.cursor.fetchone()
-
- if searchres is None:
- print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
- continue
-
- blocked = searchres[0]
- origin = searchres[1]
- nodeinfo_url = searchres[2]
- elif blocked.count("?") > 0:
- # Some obscure them with question marks, not sure if that's dependent on version or not
- fba.cursor.execute(
- "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("?", "_")]
- )
-
- searchres = fba.cursor.fetchone()
-
- if searchres is None:
- print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
- continue
-
- blocked = searchres[0]
- origin = searchres[1]
- nodeinfo_url = searchres[2]
- elif not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
+ if searchres is None:
+ print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
continue
- # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
- if not validators.domain(blocked):
- print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
+ blocked = searchres[0]
+ origin = searchres[1]
+ nodeinfo_url = searchres[2]
+ elif blocked.count("?") > 0:
+ # Some obscure them with question marks, not sure if that's dependent on version or not
+ fba.cursor.execute(
+ "SELECT domain, origin, nodeinfo_url FROM instances WHERE domain LIKE ? ORDER BY rowid LIMIT 1", [blocked.replace("?", "_")]
+ )
+
+ searchres = fba.cursor.fetchone()
+
+ if searchres is None:
+ print(f"WARNING: Cannot deobsfucate blocked='{blocked}' - SKIPPED!")
continue
- elif not instances.is_registered(blocked):
- # DEBUG: print("DEBUG: Hash wasn't found, adding:", blocked, blocker)
- instances.add(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
-
- if not blocks.is_instance_blocked(blocker, blocked, block_level):
- blocks.add_instance(blocker, blocked, reason, block_level)
-
- if block_level == "reject":
- blockdict.append({
- "blocked": blocked,
- "reason" : reason
- })
- else:
- # DEBUG: print(f"DEBUG: Updating block last seen and reason for blocker='{blocker}',blocked='{blocked}' ...")
- blocks.update_last_seen(blocker, blocked, block_level)
- blocks.update_reason(reason, blocker, blocked, block_level)
-
- # DEBUG: print("DEBUG: Committing changes ...")
- fba.connection.commit()
- except BaseException as exception:
- print(f"ERROR: blocker='{blocker}',software='{software}',exception[{type(exception)}]:'{str(exception)}'")
+
+ blocked = searchres[0]
+ origin = searchres[1]
+ nodeinfo_url = searchres[2]
+ elif not validators.domain(blocked):
+ print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
+ continue
+
+ # DEBUG: print("DEBUG: Looking up instance by domain:", blocked)
+ if not validators.domain(blocked):
+ print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
+ continue
+ elif not instances.is_registered(blocked):
+ # DEBUG: print("DEBUG: Hash wasn't found, adding:", blocked, blocker)
+ instances.add(blocked, blocker, inspect.currentframe().f_code.co_name, nodeinfo_url)
+
+ if not blocks.is_instance_blocked(blocker, blocked, block_level):
+ blocks.add_instance(blocker, blocked, reason, block_level)
+
+ if block_level == "reject":
+ blockdict.append({
+ "blocked": blocked,
+ "reason" : reason
+ })
+ else:
+ # DEBUG: print(f"DEBUG: Updating block last seen and reason for blocker='{blocker}',blocked='{blocked}' ...")
+ blocks.update_last_seen(blocker, blocked, block_level)
+ blocks.update_reason(reason, blocker, blocked, block_level)
+
+ # DEBUG: print("DEBUG: Committing changes ...")
+ fba.connection.commit()
else:
print("WARNING: Unknown software:", blocker, software)
"reject" : list(),
}
- try:
- raw = fba.fetch_url("https://raw.githubusercontent.com/chaossocial/meta/master/federation.md", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
- # DEBUG: print(f"DEBUG: raw()={len(raw)}[]={type(raw)}")
+ raw = fba.fetch_url("https://raw.githubusercontent.com/chaossocial/meta/master/federation.md", network.web_headers, (config.get("connection_timeout"), config.get("read_timeout"))).text
+ # DEBUG: print(f"DEBUG: raw()={len(raw)}[]={type(raw)}")
- doc = bs4.BeautifulSoup(markdown.markdown(raw, extensions=extensions), features='html.parser')
+ doc = bs4.BeautifulSoup(markdown.markdown(raw, extensions=extensions), features='html.parser')
- # DEBUG: print(f"DEBUG: doc()={len(doc)}[]={type(doc)}")
- silenced = doc.find("h2", {"id": "silenced-instances"}).findNext("table").find("tbody")
- # DEBUG: print(f"DEBUG: silenced[]={type(silenced)}")
- domains["silenced"] = domains["silenced"] + fba.find_domains(silenced)
+ # DEBUG: print(f"DEBUG: doc()={len(doc)}[]={type(doc)}")
+ silenced = doc.find("h2", {"id": "silenced-instances"}).findNext("table").find("tbody")
+ # DEBUG: print(f"DEBUG: silenced[]={type(silenced)}")
+ domains["silenced"] = domains["silenced"] + fba.find_domains(silenced)
- blocked = doc.find("h2", {"id": "blocked-instances"}).findNext("table").find("tbody")
- # DEBUG: print(f"DEBUG: blocked[]={type(blocked)}")
- domains["reject"] = domains["reject"] + fba.find_domains(blocked)
-
- except BaseException as exception:
- print(f"ERROR: Cannot fetch from meta.chaos.social,exception[{type(exception)}]:'{str(exception)}'")
- sys.exit(255)
+ blocked = doc.find("h2", {"id": "blocked-instances"}).findNext("table").find("tbody")
+ # DEBUG: print(f"DEBUG: blocked[]={type(blocked)}")
+ domains["reject"] = domains["reject"] + fba.find_domains(blocked)
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
# DEBUG: print(f"DEBUG: args[]={type(args)} - CALLED!")
domains = list()
- try:
- print(f"INFO: Fetch FBA-specific RSS args.feed='{args.feed}' ...")
- response = fba.fetch_url(args.feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ print(f"INFO: Fetch FBA-specific RSS args.feed='{args.feed}' ...")
+ response = fba.fetch_url(args.feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
- if response.ok and response.status_code < 300 and len(response.text) > 0:
- # DEBUG: print(f"DEBUG: Parsing RSS feed ({len(response.text)} Bytes) ...")
- rss = atoma.parse_rss_bytes(response.content)
+ # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
+ if response.ok and response.status_code < 300 and len(response.text) > 0:
+ # DEBUG: print(f"DEBUG: Parsing RSS feed ({len(response.text)} Bytes) ...")
+ rss = atoma.parse_rss_bytes(response.content)
- # DEBUG: print(f"DEBUG: rss[]={type(rss)}")
- for item in rss.items:
- # DEBUG: print(f"DEBUG: item={item}")
- domain = item.link.split("=")[1]
+ # DEBUG: print(f"DEBUG: rss[]={type(rss)}")
+ for item in rss.items:
+ # DEBUG: print(f"DEBUG: item={item}")
+ domain = item.link.split("=")[1]
- if blacklist.is_blacklisted(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
- continue
- elif domain in domains:
- # DEBUG: print(f"DEBUG: domain='{domain}' is already added - SKIPPED!")
- continue
- elif instances.is_registered(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is already registered - SKIPPED!")
- continue
-
- # DEBUG: print(f"DEBUG: Adding domain='{domain}'")
- domains.append(domain)
+ if blacklist.is_blacklisted(domain):
+ # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
+ continue
+ elif domain in domains:
+ # DEBUG: print(f"DEBUG: domain='{domain}' is already added - SKIPPED!")
+ continue
+ elif instances.is_registered(domain):
+ # DEBUG: print(f"DEBUG: domain='{domain}' is already registered - SKIPPED!")
+ continue
- except BaseException as exception:
- print(f"ERROR: Cannot fetch args.feed='{args.feed}',exception[{type(exception)}]:'{str(exception)}'")
- sys.exit(255)
+ # DEBUG: print(f"DEBUG: Adding domain='{domain}'")
+ domains.append(domain)
# DEBUG: print(f"DEBUG: domains()={len(domains)}")
if len(domains) > 0:
feed = "https://ryona.agency/users/fba/feed.atom"
domains = list()
- try:
- print(f"INFO: Fetching ATOM feed='{feed}' from FBA bot account ...")
- response = fba.fetch_url(feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
-
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
- if response.ok and response.status_code < 300 and len(response.text) > 0:
- # DEBUG: print(f"DEBUG: Parsing ATOM feed ({len(response.text)} Bytes) ...")
- atom = atoma.parse_atom_bytes(response.content)
-
- # DEBUG: print(f"DEBUG: atom[]={type(atom)}")
- for entry in atom.entries:
- # DEBUG: print(f"DEBUG: entry[]={type(entry)}")
- doc = bs4.BeautifulSoup(entry.content.value, "html.parser")
- # DEBUG: print(f"DEBUG: doc[]={type(doc)}")
- for element in doc.findAll("a"):
- for href in element["href"].split(","):
- # DEBUG: print(f"DEBUG: href[{type(href)}]={href}")
- domain = fba.tidyup_domain(href)
-
- # DEBUG: print(f"DEBUG: domain='{domain}'")
- if blacklist.is_blacklisted(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
- continue
- elif domain in domains:
- # DEBUG: print(f"DEBUG: domain='{domain}' is already added - SKIPPED!")
- continue
- elif instances.is_registered(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is already registered - SKIPPED!")
- continue
- # DEBUG: print(f"DEBUG: Adding domain='{domain}',domains()={len(domains)}")
- domains.append(domain)
+ print(f"INFO: Fetching ATOM feed='{feed}' from FBA bot account ...")
+ response = fba.fetch_url(feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+
+ # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code='{response.status_code}',response.text()={len(response.text)}")
+ if response.ok and response.status_code < 300 and len(response.text) > 0:
+ # DEBUG: print(f"DEBUG: Parsing ATOM feed ({len(response.text)} Bytes) ...")
+ atom = atoma.parse_atom_bytes(response.content)
+
+ # DEBUG: print(f"DEBUG: atom[]={type(atom)}")
+ for entry in atom.entries:
+ # DEBUG: print(f"DEBUG: entry[]={type(entry)}")
+ doc = bs4.BeautifulSoup(entry.content.value, "html.parser")
+ # DEBUG: print(f"DEBUG: doc[]={type(doc)}")
+ for element in doc.findAll("a"):
+ for href in element["href"].split(","):
+ # DEBUG: print(f"DEBUG: href[{type(href)}]={href}")
+ domain = fba.tidyup_domain(href)
+
+ # DEBUG: print(f"DEBUG: domain='{domain}'")
+ if blacklist.is_blacklisted(domain):
+ # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
+ continue
+ elif domain in domains:
+ # DEBUG: print(f"DEBUG: domain='{domain}' is already added - SKIPPED!")
+ continue
+ elif instances.is_registered(domain):
+ # DEBUG: print(f"DEBUG: domain='{domain}' is already registered - SKIPPED!")
+ continue
- except BaseException as exception:
- print(f"ERROR: Cannot fetch feed='{feed}',exception[{type(exception)}]:'{str(exception)}'")
- sys.exit(255)
+ # DEBUG: print(f"DEBUG: Adding domain='{domain}',domains()={len(domains)}")
+ domains.append(domain)
# DEBUG: print(f"DEBUG: domains({len(domains)})={domains}")
if len(domains) > 0:
# DEBUG: print(f"DEBUG: Fetching peers from '{domain}',software='{software}' ...")
peers = list()
- try:
- response = network.fetch_response(domain, "/api/v1/instance/peers", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
- # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
+ response = network.fetch_response(domain, "/api/v1/instance/peers", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
+
+ data = json_from_response(response)
+ # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+
+ if not response.ok or response.status_code >= 400:
+ # DEBUG: print("DEBUG: Was not able to fetch peers, trying alternative ...")
+ response = network.fetch_response(domain, "/api/v3/site", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
-
if not response.ok or response.status_code >= 400:
- # DEBUG: print("DEBUG: Was not able to fetch peers, trying alternative ...")
- response = network.fetch_response(domain, "/api/v3/site", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
-
- data = json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
- if not response.ok or response.status_code >= 400:
- print("WARNING: Could not reach any JSON API:", domain)
- instances.update_last_error(domain, response)
- elif response.ok and isinstance(data, list):
- # DEBUG: print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
- sys.exit(255)
- elif "federated_instances" in data:
- # DEBUG: print(f"DEBUG: Found federated_instances for domain='{domain}'")
- peers = peers + add_peers(data["federated_instances"])
- # DEBUG: print("DEBUG: Added instance(s) to peers")
- else:
- print("WARNING: JSON response does not contain 'federated_instances':", domain)
- instances.update_last_error(domain, response)
+ print("WARNING: Could not reach any JSON API:", domain)
+ instances.update_last_error(domain, response)
+ elif response.ok and isinstance(data, list):
+ # DEBUG: print(f"DEBUG: domain='{domain}' returned a list: '{data}'")
+ sys.exit(255)
+ elif "federated_instances" in data:
+ # DEBUG: print(f"DEBUG: Found federated_instances for domain='{domain}'")
+ peers = peers + add_peers(data["federated_instances"])
+ # DEBUG: print("DEBUG: Added instance(s) to peers")
else:
- # DEBUG: print("DEBUG: Querying API was successful:", domain, len(data))
- peers = data
-
- except BaseException as exception:
- print("WARNING: Some error during fetch_peers():", domain, exception)
- instances.update_last_error(domain, exception)
+ print("WARNING: JSON response does not contain 'federated_instances':", domain)
+ instances.update_last_error(domain, response)
+ else:
+ # DEBUG: print("DEBUG: Querying API was successful:", domain, len(data))
+ peers = data
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
instances.set_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: path='{path}' does not match request='{request}' - SKIPPED!")
continue
- try:
- # DEBUG: print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...")
- response = network.fetch_response(domain, request, network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
-
- data = json_from_response(response)
- # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
- if response.ok and isinstance(data, dict):
- # DEBUG: print("DEBUG: Success:", request)
- instances.set_data("detection_mode", domain, "STATIC_CHECK")
- instances.set_data("nodeinfo_url" , domain, request)
- break
- elif response.ok and isinstance(data, list):
- print(f"UNSUPPORTED: domain='{domain}' returned a list: '{data}'")
- sys.exit(255)
- elif not response.ok or response.status_code >= 400:
- print("WARNING: Failed fetching nodeinfo from domain:", domain)
- instances.update_last_error(domain, response)
- continue
+ # DEBUG: print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...")
+ response = network.fetch_response(domain, request, network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
- except BaseException as exception:
- # DEBUG: print("DEBUG: Cannot fetch API request:", request)
- instances.update_last_error(domain, exception)
- pass
+ data = json_from_response(response)
+ # DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+ if response.ok and isinstance(data, dict):
+ # DEBUG: print("DEBUG: Success:", request)
+ instances.set_data("detection_mode", domain, "STATIC_CHECK")
+ instances.set_data("nodeinfo_url" , domain, request)
+ break
+ elif response.ok and isinstance(data, list):
+ print(f"UNSUPPORTED: domain='{domain}' returned a list: '{data}'")
+ sys.exit(255)
+ elif not response.ok or response.status_code >= 400:
+ print("WARNING: Failed fetching nodeinfo from domain:", domain)
+ instances.update_last_error(domain, response)
+ continue
# DEBUG: print(f"DEBUG: data()={len(data)} - EXIT!")
return data
# DEBUG: print("DEBUG: Fetching .well-known info for domain:", domain)
data = {}
- try:
- response = network.fetch_response(domain, "/.well-known/nodeinfo", network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
-
- data = json_from_response(response)
- # DEBUG: print("DEBUG: domain,response.ok,data[]:", domain, response.ok, type(data))
- if response.ok and isinstance(data, dict):
- nodeinfo = data
- # DEBUG: print("DEBUG: Found entries:", len(nodeinfo), domain)
- if "links" in nodeinfo:
- # DEBUG: print("DEBUG: Found links in nodeinfo():", len(nodeinfo["links"]))
- for link in nodeinfo["links"]:
- # DEBUG: print("DEBUG: rel,href:", link["rel"], link["href"])
- if link["rel"] in nodeinfo_identifier:
- # DEBUG: print("DEBUG: Fetching nodeinfo from:", link["href"])
- response = fetch_url(link["href"], network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
-
- data = json_from_response(response)
- # DEBUG: print("DEBUG: href,response.ok,response.status_code:", link["href"], response.ok, response.status_code)
- if response.ok and isinstance(data, dict):
- # DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data))
- instances.set_data("detection_mode", domain, "AUTO_DISCOVERY")
- instances.set_data("nodeinfo_url" , domain, link["href"])
- break
- else:
- print("WARNING: Unknown 'rel' value:", domain, link["rel"])
- else:
- print("WARNING: nodeinfo does not contain 'links':", domain)
-
- except BaseException as exception:
- print("WARNING: Failed fetching .well-known info:", domain)
- instances.update_last_error(domain, exception)
- pass
+ response = network.fetch_response(domain, "/.well-known/nodeinfo", network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
+
+ data = json_from_response(response)
+ # DEBUG: print("DEBUG: domain,response.ok,data[]:", domain, response.ok, type(data))
+ if response.ok and isinstance(data, dict):
+ nodeinfo = data
+ # DEBUG: print("DEBUG: Found entries:", len(nodeinfo), domain)
+ if "links" in nodeinfo:
+ # DEBUG: print("DEBUG: Found links in nodeinfo():", len(nodeinfo["links"]))
+ for link in nodeinfo["links"]:
+ # DEBUG: print("DEBUG: rel,href:", link["rel"], link["href"])
+ if link["rel"] in nodeinfo_identifier:
+ # DEBUG: print("DEBUG: Fetching nodeinfo from:", link["href"])
+ response = fetch_url(link["href"], network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+
+ data = json_from_response(response)
+ # DEBUG: print("DEBUG: href,response.ok,response.status_code:", link["href"], response.ok, response.status_code)
+ if response.ok and isinstance(data, dict):
+ # DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data))
+ instances.set_data("detection_mode", domain, "AUTO_DISCOVERY")
+ instances.set_data("nodeinfo_url" , domain, link["href"])
+ break
+ else:
+ print("WARNING: Unknown 'rel' value:", domain, link["rel"])
+ else:
+ print("WARNING: nodeinfo does not contain 'links':", domain)
# DEBUG: print("DEBUG: Returning data[]:", type(data))
return data
# DEBUG: print(f"DEBUG: domain='{domain}',path='{path}' - CALLED!")
software = None
- try:
- # DEBUG: print(f"DEBUG: Fetching path='{path}' from '{domain}' ...")
- response = network.fetch_response(domain, path, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
-
- # DEBUG: print("DEBUG: domain,response.ok,response.status_code,response.text[]:", domain, response.ok, response.status_code, type(response.text))
- if response.ok and response.status_code < 300 and len(response.text) > 0:
- # DEBUG: print("DEBUG: Search for <meta name='generator'>:", domain)
- doc = bs4.BeautifulSoup(response.text, "html.parser")
-
- # DEBUG: print("DEBUG: doc[]:", type(doc))
- generator = doc.find("meta", {"name": "generator"})
- site_name = doc.find("meta", {"property": "og:site_name"})
-
- # DEBUG: print(f"DEBUG: generator='{generator}',site_name='{site_name}'")
- if isinstance(generator, bs4.element.Tag):
- # DEBUG: print("DEBUG: Found generator meta tag:", domain)
- software = tidyup_domain(generator.get("content"))
- print(f"INFO: domain='{domain}' is generated by '{software}'")
- instances.set_data("detection_mode", domain, "GENERATOR")
- remove_pending_error(domain)
- elif isinstance(site_name, bs4.element.Tag):
- # DEBUG: print("DEBUG: Found property=og:site_name:", domain)
- sofware = tidyup_domain(site_name.get("content"))
- print(f"INFO: domain='{domain}' has og:site_name='{software}'")
- instances.set_data("detection_mode", domain, "SITE_NAME")
- remove_pending_error(domain)
-
- except BaseException as exception:
- # DEBUG: print(f"DEBUG: Cannot fetch / from '{domain}':", exception)
- instances.update_last_error(domain, exception)
- pass
+ # DEBUG: print(f"DEBUG: Fetching path='{path}' from '{domain}' ...")
+ response = network.fetch_response(domain, path, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+
+ # DEBUG: print("DEBUG: domain,response.ok,response.status_code,response.text[]:", domain, response.ok, response.status_code, type(response.text))
+ if response.ok and response.status_code < 300 and len(response.text) > 0:
+ # DEBUG: print("DEBUG: Search for <meta name='generator'>:", domain)
+ doc = bs4.BeautifulSoup(response.text, "html.parser")
+
+ # DEBUG: print("DEBUG: doc[]:", type(doc))
+ generator = doc.find("meta", {"name": "generator"})
+ site_name = doc.find("meta", {"property": "og:site_name"})
+
+ # DEBUG: print(f"DEBUG: generator='{generator}',site_name='{site_name}'")
+ if isinstance(generator, bs4.element.Tag):
+ # DEBUG: print("DEBUG: Found generator meta tag:", domain)
+ software = tidyup_domain(generator.get("content"))
+ print(f"INFO: domain='{domain}' is generated by '{software}'")
+ instances.set_data("detection_mode", domain, "GENERATOR")
+ remove_pending_error(domain)
+ elif isinstance(site_name, bs4.element.Tag):
+ # DEBUG: print("DEBUG: Found property=og:site_name:", domain)
+ sofware = tidyup_domain(site_name.get("content"))
+ print(f"INFO: domain='{domain}' has og:site_name='{software}'")
+ instances.set_data("detection_mode", domain, "SITE_NAME")
+ remove_pending_error(domain)
# DEBUG: print(f"DEBUG: software[]={type(software)}")
if isinstance(software, str) and software == "":