logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)
-def post_json_api(domain: str, path: str, data: str = "", headers: dict = {}) -> dict:
+def post_json_api(domain: str, path: str, data: str = "", headers: dict = None) -> dict:
logger.debug("domain='%s',path='%s',data='%s',headers()=%d - CALLED!", domain, path, data, len(headers))
domain_helper.raise_on(domain)
raise ValueError(f"path='{path}' does not start with / but should")
elif not isinstance(data, str):
raise ValueError(f"data[]='{type(data)}' is not of type 'str'")
- elif not isinstance(headers, dict):
- raise ValueError(f"headers[]='{type(headers)}' is not of type 'list'")
+ elif headers is not None and not isinstance(headers, dict):
+ raise ValueError(f"headers[]='{type(headers)}' is not of type 'dict'")
json_reply = {
"status_code": 200,
try:
logger.debug("Sending GET to domain='%s',path='%s',timeout(%d)='%s'", domain, path, len(timeout), timeout)
- response = fetch_response(domain, path, {**api_headers, **headers}, timeout)
+ response = _fetch_response(domain, path, {**api_headers, **headers}, timeout)
except exceptions as exception:
logger.debug("Fetching path='%s' from domain='%s' failed. exception[%s]='%s'", path, domain, type(exception), str(exception))
json_reply["status_code"] = 999
logger.debug("response.ok='%s',response.status_code=%d,response.text()=%d", response.ok, response.status_code, len(response.text))
return response.ok and response.status_code == 200 and response.text.strip() != ""
-def fetch_response(domain: str, path: str, headers: dict, timeout: tuple, allow_redirects: bool = False) -> requests.models.Response:
+def _fetch_response(domain: str, path: str, headers: dict, timeout: tuple, allow_redirects: bool = False) -> requests.models.Response:
logger.debug("domain='%s',path='%s',headers()=%d,timeout='%s',allow_redirects='%s' - CALLED!", domain, path, len(headers), timeout, allow_redirects)
domain_helper.raise_on(domain)
logger.debug("components[%s]='%s'", type(components), components)
if components.query != "":
logger.debug("Fetching path='%s?%s' from netloc='%s' ...", components.path, components.query, components.netloc)
- response = fetch_response(
+ response = _fetch_response(
components.netloc.split(":")[0],
f"{components.path}?{components.query}",
headers=headers,
)
else:
logger.debug("Fetching path='%s' from netloc='%s' ...", components.path, components.netloc)
- response = fetch_response(
+ response = _fetch_response(
components.netloc.split(":")[0],
components.path if isinstance(components.path, str) and components.path != '' else '/',
headers=headers,
logger.debug("response[]='%s' - EXIT!", type(response))
return response
-def fetch_json_rows(hostname: str, path: str, headers: dict = {}, rows_key: str = None):
+def fetch_json_rows(hostname: str, path: str, headers: dict = None, rows_key: str = None):
logger.debug("hostname='%s',path='%s',headers()=%d,rows_key='%s' - CALLED!", hostname, path, len(headers), rows_key)
if not isinstance(hostname, str):
raise ValueError("Parameter 'path' is an empty string")
elif not path.startswith("/"):
raise ValueError(f"path='{path}' does not start with a slash")
- elif not isinstance(headers, dict):
+ elif headers is not None and not isinstance(headers, dict):
raise ValueError(f"headers[]='{type(headers)}' is not of type 'dict'")
elif not isinstance(rows_key, str) and rows_key is not None:
raise ValueError(f"rows_key[]='{type(rows_key)}' is not of type 'str'")
raise ValueError(f"allow_redirects[]='{type(allow_redirects)}' is not of type 'bool'")
logger.debug("Fetching path='%s' from domain='%s' ...", path, domain)
- response = fetch_response(
+ response = _fetch_response(
domain,
path,
headers=web_headers,
logger.debug("Instance has no block list: domain='%s' - EXIT!", domain)
return []
+ # Init local variables
+ rows = ()
+
+ # Try to find table
table = block_tag.find("table")
logger.debug("table[]='%s'", type(table))
if table is None:
logger.warning("domain='%s' has no table tag - EXIT !", domain)
return []
- elif table.find("tbody"):
- rows = table.find("tbody").find_all("tr")
- else:
- rows = table.find_all("tr")
- logger.debug("Found rows()=%d", len(rows))
+ # Find all rows in table
+ rows = table.find_all("tr")
+
+ logger.debug("Found rows[%s]()=%d", type(rows), len(rows))
for line in rows:
logger.debug("line[%s]='%s'", type(line), line)
- blocked = line.find_all("td")[0].text
- reason = line.find_all("td")[1].text
+ tds = line.find_all("td")
+
+ logger.debug("tds[%s]()=%d", type(tds), len(tds))
+ blocked = tds[0].text.strip()
+ reason = tds[1].text.strip()
logger.debug("blocked='%s',reason='%s' - BEFORE!", blocked, reason)
blocked = tidyup.domain(blocked) if blocked != "" else None
logger.warning("rows()=%d does not have key 'metadata', domain='%s' - EXIT!", len(rows), domain)
return []
elif "federation" not in rows["metadata"]:
- logger.warning("rows()=%d does not have key 'federation', domain='%s' - EXIT!", len(rows["metadata"]), domain)
+ logger.warning("rows[metadata]()=%d does not have key 'federation', domain='%s' - EXIT!", len(rows["metadata"]), domain)
return []
found = False
if blocked in [None, ""]:
logger.debug("blocked[%s]='%s' is empty after tidyup.domain(): domain='%s',block_level='%s'", type(blocked), blocked, domain, block_level)
continue
- elif not domain_helper.is_wanted(blocked):
+ elif not domain_helper.is_tld_wanted(blocked):
+ logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
+ continue
+ elif validators.domain(blocked, rfc_2782=True) and blacklist.is_blacklisted(blocked):
+ logger.debug("blocked='%s' is blacklisted - SKIPPED!")
+ continue
+ elif blocked.find("*") == -1 and blocked.find("?") == -1 and not domain_helper.is_wanted(blocked):
logger.debug("blocked='%s' is not wanted - SKIPPED!", blocked)
continue
logger.debug("Found block_level='%s', importing domain blocks ...", block_level)
for line in header.find_next("table").find_all("tr")[1:]:
logger.debug("line[]='%s'", type(line))
- blocked = line.find_all("td")[0].text
- reason = line.find_all("td")[1].text
+ tds = line.find_all("td")
+
+ logger.debug("tds[%s]()=%d", type(tds) len(tds))
+ blocked = tds[0].text
+ reason = tds[1].text
logger.debug("blocked='%s',reason='%s' - BEFORE!", blocked, reason)
blocked = tidyup.domain(blocked) if blocked != "" else None