1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
19 from urllib.parse import urlparse
24 from fba import blacklist
25 from fba import federation
26 from fba import network
28 from fba.models import instances
31 connection = sqlite3.connect("blocks.db")
32 cursor = connection.cursor()
34 ##### Other functions #####
36 def is_primitive(var: any) -> bool:
37 # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
38 return type(var) in {int, str, float, bool} or var is None
40 def get_hash(domain: str) -> str:
41 if not isinstance(domain, str):
42 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
44 raise ValueError("Parameter 'domain' is empty")
46 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
48 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
49 # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
50 if not isinstance(url, str):
51 raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
53 raise ValueError("Parameter 'url' is empty")
54 elif not isinstance(headers, dict):
55 raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
56 elif not isinstance(timeout, tuple):
57 raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
59 # DEBUG: print(f"DEBUG: Parsing url='{url}'")
60 components = urlparse(url)
62 # Invoke other function, avoid trailing ?
63 # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
64 if components.query != "":
65 response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout)
67 response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout)
69 # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
72 def process_domain(domain: str, blocker: str, command: str) -> bool:
73 # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
74 if not isinstance(domain, str):
75 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
77 raise ValueError("Parameter 'domain' is empty")
78 elif not isinstance(blocker, str):
79 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
81 raise ValueError("Parameter 'blocker' is empty")
82 elif not isinstance(command, str):
83 raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
85 raise ValueError("Parameter 'command' is empty")
87 if domain.find("*") > 0:
88 # Try to de-obscure it
89 row = instances.deobscure("*", domain)
91 # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
93 print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
96 # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
98 elif domain.find("?") > 0:
99 # Try to de-obscure it
100 row = instances.deobscure("?", domain)
102 # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
104 print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
107 # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
110 if not validators.domain(domain):
111 print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!")
113 elif domain.endswith(".arpa"):
114 print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
116 elif blacklist.is_blacklisted(domain):
117 # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
119 elif instances.is_recent(domain):
120 # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - SKIPPED!")
125 print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
126 federation.fetch_instances(domain, blocker, None, command)
128 except network.exceptions as exception:
129 print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
130 instances.set_last_error(domain, exception)
132 # DEBUG: print(f"DEBUG: processed='{processed}' - EXIT!")