]> git.mxchange.org Git - fba.git/blob - fba/fba.py
4ae3c51cc3a237738078f5cabb3d964484d0c373
[fba.git] / fba / fba.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import re
18 import json
19 import sqlite3
20 import time
21
22 from urllib.parse import urlparse
23
24 import requests
25
26 from fba import config
27 from fba import network
28
29 # Connect to database
30 connection = sqlite3.connect("blocks.db")
31 cursor = connection.cursor()
32
33 # Pattern instance for version numbers
34 patterns = [
35     # semantic version number (with v|V) prefix)
36     re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?)?$"),
37     # non-sematic, e.g. 1.2.3.4
38     re.compile("^(?P<version>v|V{0,1})(\.{0,1})(?P<major>0|[1-9]\d*)\.(?P<minor>0+|[1-9]\d*)(\.(?P<patch>0+|[1-9]\d*)(\.(?P<subpatch>0|[1-9]\d*))?)$"),
39     # non-sematic, e.g. 2023-05[-dev]
40     re.compile("^(?P<year>[1-9]{1}[0-9]{3})\.(?P<month>[0-9]{2})(-dev){0,1}$"),
41     # non-semantic, e.g. abcdef0
42     re.compile("^[a-f0-9]{7}$"),
43 ]
44
45 ##### Other functions #####
46
47 def is_primitive(var: any) -> bool:
48     # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
49     return type(var) in {int, str, float, bool} or var is None
50
51 def remove_version(software: str) -> str:
52     # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
53     if not "." in software and " " not in software:
54         print(f"WARNING: software='{software}' does not contain a version number.")
55         return software
56
57     temp = software
58     if ";" in software:
59         temp = software.split(";")[0]
60     elif "," in software:
61         temp = software.split(",")[0]
62     elif " - " in software:
63         temp = software.split(" - ")[0]
64
65     # DEBUG: print(f"DEBUG: software='{software}'")
66     version = None
67     if " " in software:
68         version = temp.split(" ")[-1]
69     elif "/" in software:
70         version = temp.split("/")[-1]
71     elif "-" in software:
72         version = temp.split("-")[-1]
73     else:
74         # DEBUG: print(f"DEBUG: Was not able to find common seperator, returning untouched software='{software}'")
75         return software
76
77     match = None
78     # DEBUG: print(f"DEBUG: Checking {len(patterns)} patterns ...")
79     for pattern in patterns:
80         # Run match()
81         match = pattern.match(version)
82
83         # DEBUG: print(f"DEBUG: match[]='{type(match)}'")
84         if isinstance(match, re.Match):
85             # DEBUG: print(f"DEBUG: version='{version}' is matching pattern='{pattern}'")
86             break
87
88     # DEBUG: print(f"DEBUG: version[{type(version)}]='{version}',match='{match}'")
89     if not isinstance(match, re.Match):
90         print(f"WARNING: version='{version}' does not match regex, leaving software='{software}' untouched.")
91         return software
92
93     # DEBUG: print(f"DEBUG: Found valid version number: '{version}', removing it ...")
94     end = len(temp) - len(version) - 1
95
96     # DEBUG: print(f"DEBUG: end[{type(end)}]={end}")
97     software = temp[0:end].strip()
98     if " version" in software:
99         # DEBUG: print(f"DEBUG: software='{software}' contains word ' version'")
100         software = strip_until(software, " version")
101
102     # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
103     return software
104
105 def strip_powered_by(software: str) -> str:
106     # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
107     if not isinstance(software, str):
108         raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
109     elif software == "":
110         raise ValueError("Parameter 'software' is empty")
111     elif "powered by" not in software:
112         print(f"WARNING: Cannot find 'powered by' in software='{software}'!")
113         return software
114
115     start = software.find("powered by ")
116     # DEBUG: print(f"DEBUG: start[{type(start)}]='{start}'")
117
118     software = software[start + 11:].strip()
119     # DEBUG: print(f"DEBUG: software='{software}'")
120
121     software = strip_until(software, " - ")
122
123     # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
124     return software
125
126 def strip_hosted_on(software: str) -> str:
127     # DEBUG: print(f"DEBUG: software='{software}' - CALLED!")
128     if not isinstance(software, str):
129         raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
130     elif software == "":
131         raise ValueError("Parameter 'software' is empty")
132     elif "hosted on" not in software:
133         print(f"WARNING: Cannot find 'hosted on' in '{software}'!")
134         return software
135
136     end = software.find("hosted on ")
137     # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
138
139     software = software[0, end].strip()
140     # DEBUG: print(f"DEBUG: software='{software}'")
141
142     software = strip_until(software, " - ")
143
144     # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
145     return software
146
147 def strip_until(software: str, until: str) -> str:
148     # DEBUG: print(f"DEBUG: software='{software}',until='{until}' - CALLED!")
149     if not isinstance(software, str):
150         raise ValueError(f"Parameter software[]='{type(software)}' is not 'str'")
151     elif software == "":
152         raise ValueError("Parameter 'software' is empty")
153     elif not isinstance(until, str):
154         raise ValueError(f"Parameter until[]='{type(until)}' is not 'str'")
155     elif until == "":
156         raise ValueError("Parameter 'until' is empty")
157     elif not until in software:
158         print(f"WARNING: Cannot find '{until}' in '{software}'!")
159         return software
160
161     # Next, strip until part
162     end = software.find(until)
163
164     # DEBUG: print(f"DEBUG: end[{type(end)}]='{end}'")
165     if end > 0:
166         software = software[0:end].strip()
167
168     # DEBUG: print(f"DEBUG: software='{software}' - EXIT!")
169     return software
170
171 def get_hash(domain: str) -> str:
172     if not isinstance(domain, str):
173         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
174     elif domain == "":
175         raise ValueError("Parameter 'domain' is empty")
176
177     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
178
179 def log_error(domain: str, error: dict):
180     # DEBUG: print("DEBUG: domain,error[]:", domain, type(error))
181     if not isinstance(domain, str):
182         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
183     elif domain == "":
184         raise ValueError("Parameter 'domain' is empty")
185     elif config.get("write_error_log").lower() != "true":
186         # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
187         return
188
189     # DEBUG: print("DEBUG: BEFORE error[]:", type(error))
190     if isinstance(error, BaseException) or isinstance(error, json.decoder.JSONDecodeError):
191         error = f"error[{type(error)}]='{str(error)}'"
192
193     # DEBUG: print("DEBUG: AFTER error[]:", type(error))
194     if isinstance(error, str):
195         cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
196             domain,
197             error,
198             time.time()
199         ])
200     else:
201         cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
202             domain,
203             error["status_code"],
204             error["error_message"],
205             time.time()
206         ])
207
208     # Cleanup old entries
209     # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
210     cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
211
212     # DEBUG: print("DEBUG: EXIT!")
213
214 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
215     # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
216     if not isinstance(url, str):
217         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
218     elif url == "":
219         raise ValueError("Parameter 'url' is empty")
220     elif not isinstance(headers, dict):
221         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
222     elif not isinstance(timeout, tuple):
223         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
224
225     # DEBUG: print(f"DEBUG: Parsing url='{url}'")
226     components = urlparse(url)
227
228     # Invoke other function, avoid trailing ?
229     # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
230     if components.query != "":
231         response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout)
232     else:
233         response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout)
234
235     # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
236     return response