1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
19 from urllib.parse import urlparse
24 from fba import blacklist
25 from fba import federation
26 from fba import network
28 from fba.models import instances
31 connection = sqlite3.connect("blocks.db")
32 cursor = connection.cursor()
34 ##### Other functions #####
36 def is_primitive(var: any) -> bool:
37 # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
38 return type(var) in {int, str, float, bool} or var is None
40 def get_hash(domain: str) -> str:
41 # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
42 if not isinstance(domain, str):
43 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
45 raise ValueError("Parameter 'domain' is empty")
47 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
50 # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
51 if not isinstance(url, str):
52 raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
54 raise ValueError("Parameter 'url' is empty")
55 elif not isinstance(headers, dict):
56 raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
57 elif not isinstance(timeout, tuple):
58 raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
60 # DEBUG: print(f"DEBUG: Parsing url='{url}'")
61 components = urlparse(url)
63 # Invoke other function, avoid trailing ?
64 # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
65 if components.query != "":
66 response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
68 response = network.fetch_response(components.netloc, components.path if isinstance(components.path, str) and components.path != '' else '/', headers, timeout)
70 # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
73 def process_domain(domain: str, blocker: str, command: str) -> bool:
74 # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
75 if not isinstance(domain, str):
76 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
78 raise ValueError("Parameter 'domain' is empty")
79 elif not isinstance(blocker, str):
80 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
82 raise ValueError("Parameter 'blocker' is empty")
83 elif not isinstance(command, str):
84 raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
86 raise ValueError("Parameter 'command' is empty")
88 if domain.find("*") > 0:
89 # Try to de-obscure it
90 row = instances.deobscure("*", domain)
92 # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
94 print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
97 # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
99 elif domain.find("?") > 0:
100 # Try to de-obscure it
101 row = instances.deobscure("?", domain)
103 # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
105 print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
108 # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
111 if not validators.domain(domain):
112 print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!")
114 elif domain.endswith(".arpa"):
115 print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
117 elif blacklist.is_blacklisted(domain):
118 # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
120 elif instances.is_recent(domain):
121 # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - SKIPPED!")
126 print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
127 federation.fetch_instances(domain, blocker, None, command)
129 except network.exceptions as exception:
130 print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
131 instances.set_last_error(domain, exception)
133 # DEBUG: print(f"DEBUG: processed='{processed}' - EXIT!")