]> git.mxchange.org Git - fba.git/blob - fba/fba.py
Continued:
[fba.git] / fba / fba.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import sqlite3
18
19 from urllib.parse import urlparse
20
21 import requests
22 import validators
23
24 from fba import blacklist
25 from fba import federation
26 from fba import network
27
28 from fba.models import instances
29
30 # Connect to database
31 connection = sqlite3.connect("blocks.db")
32 cursor = connection.cursor()
33
34 ##### Other functions #####
35
36 def is_primitive(var: any) -> bool:
37     # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
38     return type(var) in {int, str, float, bool} or var is None
39
40 def get_hash(domain: str) -> str:
41     # DEBUG: print(f"DEBUG: domain='{domain}' - CALLED!")
42     if not isinstance(domain, str):
43         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
44     elif domain == "":
45         raise ValueError("Parameter 'domain' is empty")
46
47     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
48
49 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
50     # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
51     if not isinstance(url, str):
52         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
53     elif url == "":
54         raise ValueError("Parameter 'url' is empty")
55     elif not isinstance(headers, dict):
56         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
57     elif not isinstance(timeout, tuple):
58         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
59
60     # DEBUG: print(f"DEBUG: Parsing url='{url}'")
61     components = urlparse(url)
62
63     # Invoke other function, avoid trailing ?
64     # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
65     if components.query != "":
66         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
67     else:
68         response = network.fetch_response(components.netloc, f"{components.path}", headers, timeout)
69
70     # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
71     return response
72
73 def process_domain(domain: str, blocker: str, command: str) -> bool:
74     # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
75     if not isinstance(domain, str):
76         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
77     elif domain == "":
78         raise ValueError("Parameter 'domain' is empty")
79     elif not isinstance(blocker, str):
80         raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
81     elif blocker == "":
82         raise ValueError("Parameter 'blocker' is empty")
83     elif not isinstance(command, str):
84         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
85     elif command == "":
86         raise ValueError("Parameter 'command' is empty")
87
88     if domain.find("*") > 0:
89         # Try to de-obscure it
90         row = instances.deobscure("*", domain)
91
92         # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
93         if row is None:
94             print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
95             return False
96
97         # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
98         domain = row[0]
99     elif domain.find("?") > 0:
100         # Try to de-obscure it
101         row = instances.deobscure("?", domain)
102
103         # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
104         if row is None:
105             print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
106             return False
107
108         # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
109         domain = row[0]
110
111     if not validators.domain(domain):
112         print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!")
113         return False
114     elif domain.endswith(".arpa"):
115         print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
116         return False
117     elif blacklist.is_blacklisted(domain):
118         # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
119         return False
120     elif instances.is_recent(domain):
121         # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - SKIPPED!")
122         return False
123
124     processed = False
125     try:
126         print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
127         federation.fetch_instances(domain, blocker, None, command)
128         processed = True
129     except network.exceptions as exception:
130         print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
131         instances.set_last_error(domain, exception)
132
133     # DEBUG: print(f"DEBUG: processed='{processed}' - EXIT!")
134     return processed