# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
-import bs4
import hashlib
import re
-import requests
import json
import sqlite3
import sys
import time
+
+import bs4
+import requests
import validators
from urllib.parse import urlparse
from fba import blacklist
-from fba import cache
from fba import config
from fba import instances
from fba import network
"http://nodeinfo.diaspora.software/ns/schema/1.0",
]
-# HTTP headers for non-API requests
-headers = {
- "User-Agent": config.get("useragent"),
-}
-
-# HTTP headers for API requests
-api_headers = {
- "User-Agent": config.get("useragent"),
- "Content-Type": "application/json",
-}
-
# Connect to database
connection = sqlite3.connect("blocks.db")
cursor = connection.cursor()
# DEBUG: print(f"DEBUG: Fetching peers from '{domain}',software='{software}' ...")
peers = list()
try:
- response = network.fetch_response(domain, "/api/v1/instance/peers", api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = network.fetch_response(domain, "/api/v1/instance/peers", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
if not response.ok or response.status_code >= 400:
# DEBUG: print(f"DEBUG: Was not able to fetch peers, trying alternative ...")
- response = network.fetch_response(domain, "/api/v3/site", api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = network.fetch_response(domain, "/api/v3/site", network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
try:
# DEBUG: print(f"DEBUG: Fetching request='{request}' from domain='{domain}' ...")
- response = network.fetch_response(domain, request, api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
+ response = network.fetch_response(domain, request, network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
data = json_from_response(response)
# DEBUG: print(f"DEBUG: response.ok={response.ok},response.status_code={response.status_code},data[]='{type(data)}'")
data = {}
try:
- response = network.fetch_response(domain, "/.well-known/nodeinfo", api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
+ response = network.fetch_response(domain, "/.well-known/nodeinfo", network.api_headers, (config.get("nodeinfo_connection_timeout"), config.get("nodeinfo_read_timeout")))
data = json_from_response(response)
# DEBUG: print("DEBUG: domain,response.ok,data[]:", domain, response.ok, type(data))
# DEBUG: print("DEBUG: rel,href:", link["rel"], link["href"])
if link["rel"] in nodeinfo_identifier:
# DEBUG: print("DEBUG: Fetching nodeinfo from:", link["href"])
- response = fetch_url(link["href"], api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = fetch_url(link["href"], network.api_headers, (config.get("connection_timeout"), config.get("read_timeout")))
data = json_from_response(response)
# DEBUG: print("DEBUG: href,response.ok,response.status_code:", link["href"], response.ok, response.status_code)
try:
# DEBUG: print(f"DEBUG: Fetching path='{path}' from '{domain}' ...")
- response = network.fetch_response(domain, path, headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = network.fetch_response(domain, path, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
# DEBUG: print("DEBUG: domain,response.ok,response.status_code,response.text[]:", domain, response.ok, response.status_code, type(response.text))
if response.ok and response.status_code < 300 and len(response.text) > 0:
# DEBUG: print(f"DEBUG: tag[]={type(tag)} - CALLED!")
if not isinstance(tag, bs4.element.Tag):
raise ValueError(f"Parameter tag[]={type(tag)} is not type of bs4.element.Tag")
- elif not isinstance(tag, bs4.element.Tag):
- raise KeyError("Cannot find table with instances!")
elif len(tag.select("tr")) == 0:
raise KeyError("No table rows found in table!")
# DEBUG: print(f"DEBUG: domains()={len(domains)} - EXIT!")
return domains
-def fetch_url(url: str, headers: dict, timeout: list) -> requests.models.Response:
+def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
# DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
if not isinstance(url, str):
raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
raise ValueError("Parameter 'url' is empty")
elif not isinstance(headers, dict):
raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
- elif not isinstance(timeout, list):
- raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'list'")
+ elif not isinstance(timeout, tuple):
+ raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
# DEBUG: print(f"DEBUG: Parsing url='{url}'")
components = urlparse(url)