types = list()
if args.software is None:
logger.info("Fetching software list ...")
- raw = utils.fetch_url(
+ raw = network.fetch_url(
f"https://{source_domain}",
network.web_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
}
logger.debug("Fetching domainblocks from source_domain='%s'", source_domain)
- raw = utils.fetch_url(
+ raw = network.fetch_url(
f"https://{source_domain}/todon/domainblocks",
network.web_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
sources.update(source_domain)
logger.info("Fetching federation.md from source_domain='%s' ...", source_domain)
- raw = utils.fetch_url(
+ raw = network.fetch_url(
f"https://{source_domain}/chaossocial/meta/master/federation.md",
network.web_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
sources.update(domain)
logger.info("Fetch FBA-specific RSS args.feed='%s' ...", args.feed)
- response = utils.fetch_url(args.feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = network.fetch_url(args.feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code == 200 and len(response.text) > 0:
domains = list()
logger.info("Fetching ATOM feed='%s' from FBA bot account ...", feed)
- response = utils.fetch_url(feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = network.fetch_url(feed, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code == 200 and len(response.text) > 0:
logger.info("Checking %d text file(s) ...", len(blocklists.txt_files))
for row in blocklists.txt_files:
logger.debug("Fetching row[url]='%s' ...", row["url"])
- response = utils.fetch_url(row["url"], network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+ response = network.fetch_url(row["url"], network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
if response.ok and response.status_code == 200 and response.text != "":
sources.update(source_domain)
logger.info("Fetching / from source_domain='%s' ...", source_domain)
- response = utils.fetch_url(
+ response = network.fetch_url(
f"https://{source_domain}",
network.web_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
sources.update(source_domain)
logger.info("Fetching instances from source_domain='%s' ...", source_domain)
- raw = utils.fetch_url(
+ raw = network.fetch_url(
f"https://{source_domain}/api/v1/instances",
network.web_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
sources.update(source_domain)
logger.info("Fetching instances.json from source_domain='%s' ...", source_domain)
- raw = utils.fetch_url(
+ raw = network.fetch_url(
f"https://{source_domain}/instances.json",
network.web_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
continue
else:
logger.info("Fetching / from relay row[domain]='%s',row[software]='%s' ...", row["domain"], row["software"])
- raw = utils.fetch_url(
+ raw = network.fetch_url(
f"https://{row['domain']}",
network.web_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
# Fetch this URL
logger.info("Fetching url='%s' for blocker='%s' ...", url, blocker)
- response = utils.fetch_url(
+ response = network.fetch_url(
url,
network.web_headers,
(config.get("connection_timeout"), config.get("read_timeout"))
import logging
import time
+from urllib.parse import urlparse
+
import reqto
import requests
import urllib3
try:
logger.debug("Fetching url='%s' ...", url)
- response = utils.fetch_url(url, api_headers, timeout)
+ response = fetch_url(url, api_headers, timeout)
logger.debug("response.ok='%s',response.status_code=%d,response.reason='%s'", response.ok, response.status_code, response.reason)
if response.ok and response.status_code == 200:
logger.debug("response[]='%s' - EXIT!", type(response))
return response
+
+def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
+ logger.debug("url='%s',headers()=%d,timeout(%d)='%s' - CALLED!", url, len(headers), len(timeout), timeout)
+
+ if not isinstance(url, str):
+ raise ValueError(f"Parameter url[]='{type(url)}' is not of type 'str'")
+ elif url == "":
+ raise ValueError("Parameter 'url' is empty")
+ elif not validators.url(url):
+ raise ValueError(f"Parameter url='{url}' is not a valid URL")
+ elif not isinstance(headers, dict):
+ raise ValueError(f"Parameter headers[]='{type(headers)}' is not of type 'dict'")
+ elif not isinstance(timeout, tuple):
+ raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not of type 'tuple'")
+
+ logger.debug("Parsing url='%s' ...", url)
+ components = urlparse(url)
+
+ # Invoke other function, avoid trailing ?
+ logger.debug("components[%s]='%s'", type(components), components)
+ if components.query != "":
+ logger.debug("Fetching path='%s?%s' from netloc='%s' ...", components.path, components.query, components.netloc)
+ response = fetch_response(
+ components.netloc.split(":")[0],
+ f"{components.path}?{components.query}",
+ headers,
+ timeout
+ )
+ else:
+ logger.debug("Fetching path='%s' from netloc='%s' ...", components.path, components.netloc)
+ response = fetch_response(
+ components.netloc.split(":")[0],
+ components.path if isinstance(components.path, str) and components.path != '' else '/',
+ headers,
+ timeout
+ )
+
+ logger.debug("response[]='%s' - EXIT!", type(response))
+ return response
# Invoke other function, avoid trailing ?
logger.debug("components[%s]='%s'", type(components), components)
if components.query != "":
+ logger.debug("Fetching path='%s?%s' from netloc='%s' ...", components.path, components.query, components.netloc)
response = network.fetch_response(
components.netloc.split(":")[0],
f"{components.path}?{components.query}",
timeout
)
else:
+ logger.debug("Fetching path='%s' from netloc='%s' ...", components.path, components.netloc)
response = network.fetch_response(
components.netloc.split(":")[0],
components.path if isinstance(components.path, str) and components.path != '' else '/',
domain = tidyup.domain(tag.find("em").contents[0])
logger.debug("domain='%s' - AFTER!", domain)
+ logger.debug("domain='%s' - AFTER2!", domain)
if domain == "":
logger.warning("Empty domain after checking search='%s' and <em> tags - SKIPPED!", search)
continue
logger.debug("Setting has_obfuscation=False for blocker='%s' ...", blocker)
instances.set_has_obfuscation(blocker, False)
+ logger.debug("Checking domain='%s' ...", domain)
if domain.find("*") >= 0:
logger.debug("blocker='%s' uses obfuscated domains", blocker)
instances.set_has_obfuscation(blocker, True)