]> git.mxchange.org Git - fba.git/blob - fba/fba.py
Continued:
[fba.git] / fba / fba.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import re
18 import json
19 import sqlite3
20 import sys
21 import time
22
23 from urllib.parse import urlparse
24
25 import requests
26
27 from fba import config
28 from fba import network
29
30 # Array with pending errors needed to be written to database
31 pending_errors = {
32 }
33
34 # Connect to database
35 connection = sqlite3.connect("blocks.db")
36 cursor = connection.cursor()
37
38 # Pattern instance for version numbers
39 patterns = [
40     # semantic version number (with v|V) prefix)
41     re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)?$"),
42     # non-sematic, e.g. 1.2.3.4
43     re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(\.(?P<subpatch>0|[1-9]\d*))?)$"),
44     # non-sematic, e.g. 2023-05[-dev]
45     re.compile("^(?P<year>[1-9]{1}[0-9]{3})\.(?P<month>[0-9]{2})(-dev){0,1}$"),
46     # non-semantic, e.g. abcdef0
47     re.compile("^[a-f0-9]{7}$"),
48 ]
49
50 ##### Other functions #####
51
52 def is_primitive(var: any) -> bool:
53     print(f"DEBUG: var[]='{type(var)}' - CALLED!")
54     return type(var) in {int, str, float, bool} or var is None
55
56 def remove_version(software: str) -> str:
57     print(f"DEBUG: software='{software}' - CALLED!")
58     if not "." in software and " " not in software:
59         print(f"WARNING: software='{software}' does not contain a version number.")
60         return software
61
62     temp = software
63     if ";" in software:
64         temp = software.split(";")[0]
65     elif "," in software:
66         temp = software.split(",")[0]
67     elif " - " in software:
68         temp = software.split(" - ")[0]
69
70     print(f"DEBUG: software='{software}'")
71     version = None
72     if " " in software:
73         version = temp.split(" ")[-1]
74     elif "/" in software:
75         version = temp.split("/")[-1]
76     elif "-" in software:
77         version = temp.split("-")[-1]
78     else:
79         print(f"DEBUG: Was not able to find common seperator, returning untouched software='{software}'")
80         return software
81
82     match = None
83     print(f"DEBUG: Checking {len(patterns)} patterns ...")
84     for pattern in patterns:
85         # Run match()
86         match = pattern.match(version)
87
88         print(f"DEBUG: match[]={type(match)}")
89         if isinstance(match, re.Match):
90             print(f"DEBUG: version='{version}' is matching pattern='{pattern}'")
91             break
92
93     print(f"DEBUG: version[{type(version)}]='{version}',match='{match}'")
94     if not isinstance(match, re.Match):
95         print(f"WARNING: version='{version}' does not match regex, leaving software='{software}' untouched.")
96         return software
97
98     print(f"DEBUG: Found valid version number: '{version}', removing it ...")
99     end = len(temp) - len(version) - 1
100
101     print(f"DEBUG: end[{type(end)}]={end}")
102     software = temp[0:end].strip()
103     if " version" in software:
104         print(f"DEBUG: software='{software}' contains word ' version'")
105         software = strip_until(software, " version")
106
107     print(f"DEBUG: software='{software}' - EXIT!")
108     return software
109
110 def strip_powered_by(software: str) -> str:
111     print(f"DEBUG: software='{software}' - CALLED!")
112     if not isinstance(software, str):
113         raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
114     elif software == "":
115         raise ValueError("Parameter 'software' is empty")
116     elif "powered by" not in software:
117         print(f"WARNING: Cannot find 'powered by' in software='{software}'!")
118         return software
119
120     start = software.find("powered by ")
121     print(f"DEBUG: start[{type(start)}]='{start}'")
122
123     software = software[start + 11:].strip()
124     print(f"DEBUG: software='{software}'")
125
126     software = strip_until(software, " - ")
127
128     print(f"DEBUG: software='{software}' - EXIT!")
129     return software
130
131 def strip_hosted_on(software: str) -> str:
132     print(f"DEBUG: software='{software}' - CALLED!")
133     if not isinstance(software, str):
134         raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
135     elif software == "":
136         raise ValueError("Parameter 'software' is empty")
137     elif "hosted on" not in software:
138         print(f"WARNING: Cannot find 'hosted on' in '{software}'!")
139         return software
140
141     end = software.find("hosted on ")
142     print(f"DEBUG: end[{type(end)}]='{end}'")
143
144     software = software[0, end].strip()
145     print(f"DEBUG: software='{software}'")
146
147     software = strip_until(software, " - ")
148
149     print(f"DEBUG: software='{software}' - EXIT!")
150     return software
151
152 def strip_until(software: str, until: str) -> str:
153     print(f"DEBUG: software='{software}',until='{until}' - CALLED!")
154     if not isinstance(software, str):
155         raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
156     elif software == "":
157         raise ValueError("Parameter 'software' is empty")
158     elif not isinstance(until, str):
159         raise ValueError(f"Parameter until[]='{type(until)}' is not 'str'")
160     elif until == "":
161         raise ValueError("Parameter 'until' is empty")
162     elif not until in software:
163         print(f"WARNING: Cannot find '{until}' in '{software}'!")
164         return software
165
166     # Next, strip until part
167     end = software.find(until)
168
169     print(f"DEBUG: end[{type(end)}]='{end}'")
170     if end > 0:
171         software = software[0:end].strip()
172
173     print(f"DEBUG: software='{software}' - EXIT!")
174     return software
175
176 def remove_pending_error(domain: str):
177     if not isinstance(domain, str):
178         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
179     elif domain == "":
180         raise ValueError("Parameter 'domain' is empty")
181
182     try:
183         # Prevent updating any pending errors, nodeinfo was found
184         del pending_errors[domain]
185
186     except:
187         pass
188
189     print("DEBUG: EXIT!")
190
191 def get_hash(domain: str) -> str:
192     if not isinstance(domain, str):
193         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
194     elif domain == "":
195         raise ValueError("Parameter 'domain' is empty")
196
197     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
198
199 def log_error(domain: str, response: requests.models.Response):
200     print("DEBUG: domain,response[]:", domain, type(response))
201     if not isinstance(domain, str):
202         raise ValueError(f"Parameter domain[]={type(domain)} is not 'str'")
203     elif domain == "":
204         raise ValueError("Parameter 'domain' is empty")
205     elif config.get("write_error_log").lower() != "true":
206         print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
207         return
208
209     try:
210         print("DEBUG: BEFORE response[]:", type(response))
211         if isinstance(response, BaseException) or isinstance(response, json.decoder.JSONDecodeError):
212             response = f"response[{type(response)}]='{str(response)}'"
213
214         print("DEBUG: AFTER response[]:", type(response))
215         if isinstance(response, str):
216             cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
217                 domain,
218                 response,
219                 time.time()
220             ])
221         else:
222             cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
223                 domain,
224                 response.status_code,
225                 response.reason,
226                 time.time()
227             ])
228
229         # Cleanup old entries
230         print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
231         cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
232     except BaseException as exception:
233         print(f"ERROR: failed SQL query: domain='{domain}',exception[{type(exception)}]:'{str(exception)}'")
234         sys.exit(255)
235
236     print("DEBUG: EXIT!")
237
238 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
239     print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
240     if not isinstance(url, str):
241         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
242     elif url == "":
243         raise ValueError("Parameter 'url' is empty")
244     elif not isinstance(headers, dict):
245         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
246     elif not isinstance(timeout, tuple):
247         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
248
249     print(f"DEBUG: Parsing url='{url}'")
250     components = urlparse(url)
251
252     # Invoke other function, avoid trailing ?
253     print(f"DEBUG: components[{type(components)}]={components}")
254     if components.query != "":
255         response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout)
256     else:
257         response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout)
258
259     print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
260     return response