]> git.mxchange.org Git - fba.git/blob - fba/fba.py
8eb5cd53277a0ceff094751726554c196be1180f
[fba.git] / fba / fba.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import json
18 import sqlite3
19 import time
20
21 from urllib.parse import urlparse
22
23 import requests
24 import validators
25
26 from fba import blacklist
27 from fba import config
28 from fba import federation
29 from fba import instances
30 from fba import network
31
32 # Connect to database
33 connection = sqlite3.connect("blocks.db")
34 cursor = connection.cursor()
35
36 ##### Other functions #####
37
38 def is_primitive(var: any) -> bool:
39     # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
40     return type(var) in {int, str, float, bool} or var is None
41
42 def get_hash(domain: str) -> str:
43     if not isinstance(domain, str):
44         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
45     elif domain == "":
46         raise ValueError("Parameter 'domain' is empty")
47
48     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
49
50 def log_error(domain: str, error: dict):
51     # DEBUG: print("DEBUG: domain,error[]:", domain, type(error))
52     if not isinstance(domain, str):
53         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
54     elif domain == "":
55         raise ValueError("Parameter 'domain' is empty")
56     elif config.get("write_error_log").lower() != "true":
57         # DEBUG: print(f"DEBUG: Writing to error_log is disabled in configuruation file - EXIT!")
58         return
59
60     # DEBUG: print("DEBUG: BEFORE error[]:", type(error))
61     if isinstance(error, BaseException, error, json.decoder.JSONDecodeError):
62         error = f"error[{type(error)}]='{str(error)}'"
63
64     # DEBUG: print("DEBUG: AFTER error[]:", type(error))
65     if isinstance(error, str):
66         cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, 999, ?, ?)",[
67             domain,
68             error,
69             time.time()
70         ])
71     else:
72         cursor.execute("INSERT INTO error_log (domain, error_code, error_message, created) VALUES (?, ?, ?, ?)",[
73             domain,
74             error["status_code"],
75             error["error_message"],
76             time.time()
77         ])
78
79     # Cleanup old entries
80     # DEBUG: print(f"DEBUG: Purging old records (distance: {config.get('error_log_cleanup')})")
81     cursor.execute("DELETE FROM error_log WHERE created < ?", [time.time() - config.get("error_log_cleanup")])
82
83     # DEBUG: print("DEBUG: EXIT!")
84
85 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
86     # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
87     if not isinstance(url, str):
88         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
89     elif url == "":
90         raise ValueError("Parameter 'url' is empty")
91     elif not isinstance(headers, dict):
92         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
93     elif not isinstance(timeout, tuple):
94         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
95
96     # DEBUG: print(f"DEBUG: Parsing url='{url}'")
97     components = urlparse(url)
98
99     # Invoke other function, avoid trailing ?
100     # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
101     if components.query != "":
102         response = network.fetch_response(components.hostname, f"{components.path}?{components.query}", headers, timeout)
103     else:
104         response = network.fetch_response(components.hostname, f"{components.path}", headers, timeout)
105
106     # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
107     return response
108
109 def process_domain(domain: str, blocker: str, command: str) -> bool:
110     # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
111     if not isinstance(domain, str):
112         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
113     elif domain == "":
114         raise ValueError("Parameter 'domain' is empty")
115     elif not isinstance(blocker, str):
116         raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
117     elif blocker == "":
118         raise ValueError("Parameter 'blocker' is empty")
119     elif not isinstance(command, str):
120         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
121     elif command == "":
122         raise ValueError("Parameter 'command' is empty")
123
124     if domain.find("*") > 0:
125         # Try to de-obscure it
126         row = instances.deobscure("*", domain)
127
128         # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
129         if row is None:
130             print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
131             return False
132
133         # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
134         domain = row[0]
135     elif domain.find("?") > 0:
136         # Try to de-obscure it
137         row = instances.deobscure("?", domain)
138
139         # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
140         if row is None:
141             print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
142             return False
143
144         # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
145         domain = row[0]
146
147     if not validators.domain(domain):
148         print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!")
149         return False
150     elif domain.split(".")[-1] == "arpa":
151         print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
152         return False
153     elif blacklist.is_blacklisted(domain):
154         # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
155         return False
156     elif instances.is_recent(domain):
157         # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - SKIPPED!")
158         return False
159
160     try:
161         print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
162         federation.fetch_instances(domain, blocker, None, command)
163     except network.exceptions as exception:
164         print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
165         instances.update_last_error(domain, exception)
166         return False
167
168     # DEBUG: print(f"DEBUG: Success! - EXIT!")
169     return True