# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
-import bs4
import hashlib
import re
-import requests
import json
import sqlite3
import sys
import time
-import validators
from urllib.parse import urlparse
+import bs4
+import requests
+import validators
+
from fba import blacklist
-from fba import cache
from fba import config
from fba import instances
from fba import network
elif not isinstance(origin, str) and origin is not None:
raise ValueError(f"Parameter origin[]={type(origin)} is not 'str'")
elif software is None:
- print(f"DEBUG: software for domain='{domain}' is not set, determining ...")
+ # DEBUG: print(f"DEBUG: software for domain='{domain}' is not set, determining ...")
software = determine_software(domain, path)
- print(f"DEBUG: Determined software='{software}' for domain='{domain}'")
+ # DEBUG: print(f"DEBUG: Determined software='{software}' for domain='{domain}'")
elif not isinstance(software, str):
raise ValueError(f"Parameter software[]={type(software)} is not 'str'")
elif not isinstance(script, str):
# DEBUG: print("DEBUG: Fetching instances for domain:", domain, software)
peerlist = fetch_peers(domain, software)
- if (peerlist is None):
+ if peerlist is None:
print("ERROR: Cannot fetch peers:", domain)
return
elif instances.has_pending_instance_data(domain):
if not instances.is_registered(instance):
# DEBUG: print("DEBUG: Adding new instance:", instance, domain)
instances.add(instance, domain, script)
- except BaseException as exception:
- print(f"ERROR: instance='{instance}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: instance='{instance}',exc[{type(exc)}]:'{str(exc)}'")
continue
# DEBUG: print("DEBUG: EXIT!")
raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
elif domain == "":
raise ValueError("Parameter 'domain' is empty")
+ elif config.get("write_error_log").lower() != "true":
+ # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
+ return
try:
# DEBUG: print("DEBUG: BEFORE response[]:", type(response))
if isinstance(response, BaseException) or isinstance(response, json.decoder.JSONDecodeError):
- response = str(response)
+ response = f"response[{type(response)}]='{str(response)}'"
# DEBUG: print("DEBUG: AFTER response[]:", type(response))
if isinstance(response, str):
# Cleanup old entries
# DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
- except BaseException as exception:
- print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
+ except BaseException as exc:
+ print(f"ERROR: failed SQL query: domain='{domain}',exc[{type(exc)}]:'{str(exc)}'")
sys.exit(255)
# DEBUG: print("DEBUG: EXIT!")
peers = list()
try:
response = network.fetch_response(domain, "/api/v1/instance/peers", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
data = json_from_response(response)
-
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
+
if not response.ok or response.status_code >= 400:
- # DEBUG: print(f"DEBUG: Was not able to fetch peers, trying alternative ...")
+ # DEBUG: print("DEBUG: Was not able to fetch peers, trying alternative ...")
response = network.fetch_response(domain, "/api/v3/site", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
# DEBUG: print("DEBUG: Querying API was successful:", domain, len(data))
peers = data
- except BaseException as exception:
- print("WARNING: Some error during get():", domain, exception)
- instances.update_last_error(domain, exception)
+ except BaseException as exc:
+ print("WARNING: Some error during fetch_peers():", domain, exc)
+ instances.update_last_error(domain, exc)
# DEBUG: print(f"DEBUG: Adding '{len(peers)}' for domain='{domain}'")
- instances.set("total_peers", domain, len(peers))
+ instances.set_data("total_peers", domain, len(peers))
# DEBUG: print(f"DEBUG: Updating last_instance_fetch for domain='{domain}' ...")
instances.update_last_instance_fetch(domain)
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
if response.ok and isinstance(data, dict):
# DEBUG: print("DEBUG: Success:", request)
- instances.set("detection_mode", domain, "STATIC_CHECK")
- instances.set("nodeinfo_url" , domain, request)
+ instances.set_data("detection_mode", domain, "STATIC_CHECK")
+ instances.set_data("nodeinfo_url" , domain, request)
break
elif response.ok and isinstance(data, list):
print(f"UNSUPPORTED: domain='{domain}' returned a list: '{data}'")
instances.update_last_error(domain, response)
continue
- except BaseException as exception:
+ except BaseException as exc:
# DEBUG: print("DEBUG: Cannot fetch API request:", request)
- instances.update_last_error(domain, exception)
+ instances.update_last_error(domain, exc)
pass
# DEBUG: print(f"DEBUG: data()={len(data)} - EXIT!")
# DEBUG: print("DEBUG: href,response.ok,response.status_code:", link["href"], response.ok, response.status_code)
if response.ok and isinstance(data, dict):
# DEBUG: print("DEBUG: Found JSON nodeinfo():", len(data))
- instances.set("detection_mode", domain, "AUTO_DISCOVERY")
- instances.set("nodeinfo_url" , domain, link["href"])
+ instances.set_data("detection_mode", domain, "AUTO_DISCOVERY")
+ instances.set_data("nodeinfo_url" , domain, link["href"])
break
else:
print("WARNING: Unknown 'rel' value:", domain, link["rel"])
else:
print("WARNING: nodeinfo does not contain 'links':", domain)
- except BaseException as exception:
+ except BaseException as exc:
print("WARNING: Failed fetching .well-known info:", domain)
- instances.update_last_error(domain, exception)
+ instances.update_last_error(domain, exc)
pass
# DEBUG: print("DEBUG: Returning data[]:", type(data))
try:
# DEBUG: print(f"DEBUG: Fetching path='{path}' from '{domain}' ...")
- response = network.fetch_response(domain, path, network.headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = network.fetch_response(domain, path, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
# DEBUG: print("DEBUG: domain,response.ok,response.status_code,response.text[]:", domain, response.ok, response.status_code, type(response.text))
if response.ok and response.status_code < 300 and len(response.text) > 0:
# DEBUG: print("DEBUG: Found generator meta tag:", domain)
software = tidyup_domain(generator.get("content"))
print(f"INFO: domain='{domain}' is generated by '{software}'")
- instances.set("detection_mode", domain, "GENERATOR")
+ instances.set_data("detection_mode", domain, "GENERATOR")
remove_pending_error(domain)
elif isinstance(site_name, bs4.element.Tag):
# DEBUG: print("DEBUG: Found property=og:site_name:", domain)
sofware = tidyup_domain(site_name.get("content"))
print(f"INFO: domain='{domain}' has og:site_name='{software}'")
- instances.set("detection_mode", domain, "SITE_NAME")
+ instances.set_data("detection_mode", domain, "SITE_NAME")
remove_pending_error(domain)
- except BaseException as exception:
- # DEBUG: print(f"DEBUG: Cannot fetch / from '{domain}':", exception)
- instances.update_last_error(domain, exception)
+ except BaseException as exc:
+ # DEBUG: print(f"DEBUG: Cannot fetch / from '{domain}':", exc)
+ instances.update_last_error(domain, exc)
pass
# DEBUG: print(f"DEBUG: software[]={type(software)}")
software = "misskey"
elif software.find("/") > 0:
print("WARNING: Spliting of slash:", software)
- software = tidup_domain(software.split("/")[-1]);
+ software = tidyup_domain(software.split("/")[-1])
elif software.find("|") > 0:
print("WARNING: Spliting of pipe:", software)
- software = tidyup_domain(software.split("|")[0]);
+ software = tidyup_domain(software.split("|")[0])
elif "powered by" in software:
# DEBUG: print(f"DEBUG: software='{software}' has 'powered by' in it")
software = strip_powered_by(software)
print(f"WARNING: domain='{domain}' is blacklisted - skipped!")
continue
elif domain == "gab.com/.ai, develop.gab.com":
- # DEBUG: print(f"DEBUG: Multiple domains detected in one row")
+ # DEBUG: print("DEBUG: Multiple domains detected in one row")
domains.append({
"domain": "gab.com",
"reason": reason,