1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 from urllib.parse import urlparse
26 from fba import blacklist
27 from fba import config
28 from fba import federation
29 from fba import instances
30 from fba import network
33 connection = sqlite3.connect("blocks.db")
34 cursor = connection.cursor()
36 ##### Other functions #####
38 def is_primitive(var: any) -> bool:
39 # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
40 return type(var) in {int, str, float, bool} or var is None
42 def get_hash(domain: str) -> str:
43 if not isinstance(domain, str):
44 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
46 raise ValueError("Parameter 'domain' is empty")
48 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
50 def log_error(domain: str, error: dict):
51 # DEBUG: print("DEBUG: domain,error[]:", domain, type(error))
52 if not isinstance(domain, str):
53 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
55 raise ValueError("Parameter 'domain' is empty")
56 elif config.get("write_error_log").lower() != "true":
57 # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
60 # DEBUG: print("DEBUG: BEFORE error[]:", type(error))
61 if isinstance(error, BaseException, error, json.decoder.JSONDecodeError):
62 error = f"error[{type(error)}]='{str(error)}'"
64 # DEBUG: print("DEBUG: AFTER error[]:", type(error))
65 if isinstance(error, str):
66 cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
72 cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
75 error["error_message"],
80 # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
81 cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
83 # DEBUG: print("DEBUG: EXIT!")
85 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
86 # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
87 if not isinstance(url, str):
88 raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
90 raise ValueError("Parameter 'url' is empty")
91 elif not isinstance(headers, dict):
92 raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
93 elif not isinstance(timeout, tuple):
94 raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
96 # DEBUG: print(f"DEBUG: Parsing url='{url}'")
97 components = urlparse(url)
99 # Invoke other function, avoid trailing ?
100 # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
101 if components.query != "":
102 response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout)
104 response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout)
106 # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
109 def process_domain(domain: str, blocker: str, command: str) -> bool:
110 # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
111 if not isinstance(domain, str):
112 raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
114 raise ValueError("Parameter 'domain' is empty")
115 elif not isinstance(blocker, str):
116 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
118 raise ValueError("Parameter 'blocker' is empty")
119 elif not isinstance(command, str):
120 raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
122 raise ValueError("Parameter 'command' is empty")
124 if domain.find("*") > 0:
125 # Try to de-obscure it
126 row = instances.deobscure("*", domain)
128 # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
130 print(f"WARNING: Cannot de-obfucate domain='{domain}' - skipped!")
133 # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
135 elif domain.find("?") > 0:
136 # Try to de-obscure it
137 row = instances.deobscure("?", domain)
139 # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
141 print(f"WARNING: Cannot de-obfucate domain='{domain}' - skipped!")
144 # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
147 if not validators.domain(domain):
148 print(f"WARNING: domain='{domain}' is not a valid domain - skipped!")
150 elif domain.split(".")[-1] == "arpa":
151 print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
153 elif blacklist.is_blacklisted(domain):
154 # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - skipped!")
156 elif instances.is_recent(domain):
157 # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - skipped!")
161 print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
162 federation.fetch_instances(domain, blocker, None, command)
163 except network.exceptions as exception:
164 print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
165 instances.update_last_error(domain, exception)
168 # DEBUG: print(f"DEBUG: Success! - EXIT!")