)
parser.set_defaults(command=commands.fetch_fbabot_atom)
- ### Fetch blocks from federater ###
+ ### Fetch blocks from oliphant's GIT repository ###
parser = subparser_command.add_parser(
"fetch_oliphant",
help="Fetches CSV files (block recommendations) for more possible instances to disover",
parser.add_argument("--single", action="store_true", help="Only fetch given instance.")
parser.set_defaults(command=commands.fetch_instances)
+ ### Fetch blocks from static text file(s) ###
+ parser = subparser_command.add_parser(
+ "fetch_txt",
+ help="Fetches text/plain files as simple domain lists",
+ )
+ parser.set_defaults(command=commands.fetch_txt)
+
# DEBUG: print("DEBUG: init_parser(): EXIT!")
def run_command():
if not validators.domain(blocked):
print(f"WARNING: blocked='{blocked}',software='{software}' is not a valid domain name - skipped!")
continue
- elif blocked.split(".")[-1] == "arpa":
- print(f"WARNING: blocked='{blocked}' is a reversed .arpa domain and should not be used generally.")
- continue
elif not instances.is_registered(blocked):
# DEBUG: print("DEBUG: Hash wasn't found, adding:", blocked, blocker)
try:
}
)
+ domains = list()
for block in blocklists:
# Is domain given and not equal blocker?
if isinstance(args.domain, str) and args.domain != block["blocker"]:
# DEBUG: print(f"DEBUG: Skipping blocker='{block['blocker']}', not matching args.domain='{args.domain}'")
continue
+ elif domain in domains:
+ # DEBUG: print(f"DEBUG: domain='{domain}' already handled - skipped!")
+ continue
# Fetch this URL
print(f"INFO: Fetching csv_url='{block['csv_url']}' for blocker='{block['blocker']}' ...")
elif "domain" in row:
domain = row["domain"]
else:
- print(f"DEBUG: row='{row}' does not contain domain column")
+ # DEBUG: print(f"DEBUG: row='{row}' does not contain domain column")
continue
- if domain.find("*") > 0:
- # Try to de-obscure it
- row = instances.deobscure("*", domain)
+ # DEBUG: print(f"DEBUG: Marking domain='{domain}' as handled")
+ domains.append(domain)
- # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
- if row is None:
- print(f"WARNING: Cannot de-obfucate domain='{domain}' - skipped!")
- continue
+ # DEBUG: print(f"DEBUG: Processing domain='{domain}' ...")
+ processed = fba.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
- # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
- domain = row[0]
+ # DEBUG: print(f"DEBUG: processed='{processed}'")
- if not validators.domain(domain):
- print(f"WARNING: domain='{domain}' is not a valid domain - skipped!")
- continue
- elif blacklist.is_blacklisted(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - skipped!")
- continue
- elif instances.is_recent(domain):
- # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - skipped!")
+ # DEBUG: print("DEBUG: EXIT!")
+
+def fetch_txt(args: argparse.Namespace):
+ # DEBUG: print(f"DEBUG: args[]='{type(args)}' - CALLED!")
+ locking.acquire()
+
+ # Static URLs
+ urls = (
+ "https://seirdy.one/pb/bsl.txt",
+ )
+
+ print(f"INFO: Checking {len(urls)} text file(s) ...")
+ for url in urls:
+ # DEBUG: print(f"DEBUG: Fetching url='{url}' ...")
+ response = fba.fetch_url(url, network.web_headers, (config.get("connection_timeout"), config.get("read_timeout")))
+
+ # DEBUG: print(f"DEBUG: response[]='{type(response)}'")
+ if response.ok and response.text != "":
+ # DEBUG: print(f"DEBUG: Returned {len(response.text.strip())} Bytes for processing")
+ domains = response.text.split("\n")
+
+ print(f"INFO: Processing {len(domains)} domains ...")
+ for domain in domains:
+ if domain == "":
continue
- try:
- print(f"INFO: Fetching instances for instane='{domain}' ...")
- federation.fetch_instances(domain, block["blocker"], None, inspect.currentframe().f_code.co_name)
- except network.exceptions as exception:
- print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
- instances.update_last_error(domain, exception)
+ # DEBUG: print(f"DEBUG: domain='{domain}'")
+ processed = fba.process_domain(domain, 'seirdy.one', inspect.currentframe().f_code.co_name)
+
+ # DEBUG: print(f"DEBUG: processed='{processed}'")
+ if not processed:
+ # DEBUG: print(f"DEBUG: domain='{domain}' was not generically processed - skipped!")
+ continue
# DEBUG: print("DEBUG: EXIT!")
from urllib.parse import urlparse
import requests
+import validators
+from fba import blacklist
from fba import config
+from fba import federation
+from fba import instances
from fba import network
# Connect to database
# DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
return response
+
+def process_domain(domain: str, blocker: str, command: str) -> bool:
+ # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
+ if not isinstance(domain, str):
+ raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
+ elif domain == "":
+ raise ValueError("Parameter 'domain' is empty")
+ elif not isinstance(blocker, str):
+ raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
+ elif blocker == "":
+ raise ValueError("Parameter 'blocker' is empty")
+ elif not isinstance(command, str):
+ raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
+ elif command == "":
+ raise ValueError("Parameter 'command' is empty")
+
+ if domain.find("*") > 0:
+ # Try to de-obscure it
+ row = instances.deobscure("*", domain)
+
+ # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
+ if row is None:
+ print(f"WARNING: Cannot de-obfucate domain='{domain}' - skipped!")
+ return False
+
+ # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
+ domain = row[0]
+ elif domain.find("?") > 0:
+ # Try to de-obscure it
+ row = instances.deobscure("?", domain)
+
+ # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
+ if row is None:
+ print(f"WARNING: Cannot de-obfucate domain='{domain}' - skipped!")
+ return False
+
+ # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
+ domain = row[0]
+
+ if not validators.domain(domain):
+ print(f"WARNING: domain='{domain}' is not a valid domain - skipped!")
+ return False
+ elif domain.split(".")[-1] == "arpa":
+ print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
+ return False
+ elif blacklist.is_blacklisted(domain):
+ # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - skipped!")
+ return False
+ elif instances.is_recent(domain):
+ # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - skipped!")
+ return False
+
+ try:
+ print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
+ federation.fetch_instances(domain, blocker, None, command)
+ except network.exceptions as exception:
+ print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
+ instances.update_last_error(domain, exception)
+ return False
+
+ # DEBUG: print(f"DEBUG: Success! - EXIT!")
+ return True