1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
23 from urllib.parse import urlparse
27 from fba import config
28 from fba import network
30 # Array with pending errors needed to be written to database
35 connection = sqlite3.connect("blocks.db")
36 cursor = connection.cursor()
38 # Pattern instance for version numbers
40 # semantic version number (with v|V) prefix)
41 re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)?$"),
42 # non-sematic, e.g. 1.2.3.4
43 re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(\.(?P<subpatch>0|[1-9]\d*))?)$"),
44 # non-sematic, e.g. 2023-05[-dev]
45 re.compile("^(?P<year>[1-9]{1}[0-9]{3})\.(?P<month>[0-9]{2})(-dev){0,1}$"),
46 # non-semantic, e.g. abcdef0
47 re.compile("^[a-f0-9]{7}$"),
50 ##### Other functions #####
52 def is_primitive(var: any) -> bool:
53 # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
54 return type(var) in {int, str, float, bool} or var is None
56 def remove_version(software: str) -> str:
57 # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
58 if not "." in software and " " not in software:
59 print(f"WARNING: software='{software}' does not contain a version number.")
64 temp = software.split(";")[0]
66 temp = software.split(",")[0]
67 elif " - " in software:
68 temp = software.split(" - ")[0]
70 # DEBUG: print(f"DEBUG: software='{software}'")
73 version = temp.split(" ")[-1]
75 version = temp.split("/")[-1]
77 version = temp.split("-")[-1]
79 # DEBUG: print(f"DEBUG: Was not able to find common seperator, returning untouched software='{software}'")
83 # DEBUG: print(f"DEBUG: Checking {len(patterns)} patterns ...")
84 for pattern in patterns:
86 match = pattern.match(version)
88 # DEBUG: print(f"DEBUG: match[]={type(match)}")
89 if isinstance(match, re.Match):
90 # DEBUG: print(f"DEBUG: version='{version}' is matching pattern='{pattern}'")
93 # DEBUG: print(f"DEBUG: version[{type(version)}]='{version}',match='{match}'")
94 if not isinstance(match, re.Match):
95 print(f"WARNING: version='{version}' does not match regex, leaving software='{software}' untouched.")
98 # DEBUG: print(f"DEBUG: Found valid version number: '{version}', removing it ...")
99 end = len(temp) - len(version) - 1
101 # DEBUG: print(f"DEBUG: end[{type(end)}]={end}")
102 software = temp[0:end].strip()
103 if " version" in software:
104 # DEBUG: print(f"DEBUG: software='{software}' contains word ' version'")
105 software = strip_until(software, " version")
107 # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
110 def strip_powered_by(software: str) -> str:
111 # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
112 if not isinstance(software, str):
113 raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
115 raise ValueError("Parameter 'software' is empty")
116 elif "powered by" not in software:
117 print(f"WARNING: Cannot find 'powered by' in software='{software}'!")
120 start = software.find("powered by ")
121 # DEBUG: print(f"DEBUG: start[{type(start)}]='{start}'")
123 software = software[start + 11:].strip()
124 # DEBUG: print(f"DEBUG: software='{software}'")
126 software = strip_until(software, " - ")
128 # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
131 def strip_hosted_on(software: str) -> str:
132 # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
133 if not isinstance(software, str):
134 raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
136 raise ValueError("Parameter 'software' is empty")
137 elif "hosted on" not in software:
138 print(f"WARNING: Cannot find 'hosted on' in '{software}'!")
141 end = software.find("hosted on ")
142 # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
144 software = software[0, end].strip()
145 # DEBUG: print(f"DEBUG: software='{software}'")
147 software = strip_until(software, " - ")
149 # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
152 def strip_until(software: str, until: str) -> str:
153 # DEBUG: print(f"DEBUG: software='{software}',until='{until}' - CALLED!")
154 if not isinstance(software, str):
155 raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
157 raise ValueError("Parameter 'software' is empty")
158 elif not isinstance(until, str):
159 raise ValueError(f"Parameter until[]='{type(until)}' is not 'str'")
161 raise ValueError("Parameter 'until' is empty")
162 elif not until in software:
163 print(f"WARNING: Cannot find '{until}' in '{software}'!")
166 # Next, strip until part
167 end = software.find(until)
169 # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
171 software = software[0:end].strip()
173 # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
176 def remove_pending_error(domain: str):
177 if not isinstance(domain, str):
178 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
180 raise ValueError("Parameter 'domain' is empty")
183 # Prevent updating any pending errors, nodeinfo was found
184 del pending_errors[domain]
189 # DEBUG: print("DEBUG: EXIT!")
191 def get_hash(domain: str) -> str:
192 if not isinstance(domain, str):
193 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
195 raise ValueError("Parameter 'domain' is empty")
197 return hashlib.sha256(domain.encode("utf-8")).hexdigest()
199 def log_error(domain: str, response: requests.models.Response):
200 # DEBUG: print("DEBUG: domain,response[]:", domain, type(response))
201 if not isinstance(domain, str):
202 raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
204 raise ValueError("Parameter 'domain' is empty")
205 elif config.get("write_error_log").lower() != "true":
206 # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
210 # DEBUG: print("DEBUG: BEFORE response[]:", type(response))
211 if isinstance(response, BaseException) or isinstance(response, json.decoder.JSONDecodeError):
212 response = f"response[{type(response)}]='{str(response)}'"
214 # DEBUG: print("DEBUG: AFTER response[]:", type(response))
215 if isinstance(response, str):
216 cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
222 cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
224 response.status_code,
229 # Cleanup old entries
230 # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
231 cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
232 except BaseException as exception:
233 print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
236 # DEBUG: print("DEBUG: EXIT!")
238 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
239 # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
240 if not isinstance(url, str):
241 raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
243 raise ValueError("Parameter 'url' is empty")
244 elif not isinstance(headers, dict):
245 raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
246 elif not isinstance(timeout, tuple):
247 raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
249 # DEBUG: print(f"DEBUG: Parsing url='{url}'")
250 components = urlparse(url)
252 # Invoke other function, avoid trailing ?
253 # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
254 if components.query != "":
255 response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout)
257 response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout)
259 # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")