]> git.mxchange.org Git - fba.git/blob - fba/fba.py
Continued:
[fba.git] / fba / fba.py
1 # Copyright (C) 2023 Free Software Foundation
2 #
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Affero General Public License for more details.
12 #
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 import hashlib
17 import sqlite3
18
19 from urllib.parse import urlparse
20
21 import requests
22 import validators
23
24 from fba import blacklist
25 from fba import federation
26 from fba import network
27
28 from fba.models import instances
29
30 # Connect to database
31 connection = sqlite3.connect("blocks.db")
32 cursor = connection.cursor()
33
34 ##### Other functions #####
35
36 def is_primitive(var: any) -> bool:
37     # DEBUG: print(f"DEBUG: var[]='{type(var)}' - CALLED!")
38     return type(var) in {int, str, float, bool} or var is None
39
40 def get_hash(domain: str) -> str:
41     if not isinstance(domain, str):
42         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
43     elif domain == "":
44         raise ValueError("Parameter 'domain' is empty")
45
46     return hashlib.sha256(domain.encode("utf-8")).hexdigest()
47
48 def fetch_url(url: str, headers: dict, timeout: tuple) -> requests.models.Response:
49     # DEBUG: print(f"DEBUG: url='{url}',headers()={len(headers)},timeout={timeout} - CALLED!")
50     if not isinstance(url, str):
51         raise ValueError(f"Parameter url[]='{type(url)}' is not 'str'")
52     elif url == "":
53         raise ValueError("Parameter 'url' is empty")
54     elif not isinstance(headers, dict):
55         raise ValueError(f"Parameter headers[]='{type(headers)}' is not 'dict'")
56     elif not isinstance(timeout, tuple):
57         raise ValueError(f"Parameter timeout[]='{type(timeout)}' is not 'tuple'")
58
59     # DEBUG: print(f"DEBUG: Parsing url='{url}'")
60     components = urlparse(url)
61
62     # Invoke other function, avoid trailing ?
63     # DEBUG: print(f"DEBUG: components[{type(components)}]={components}")
64     if components.query != "":
65         response = network.fetch_response(components.netloc, f"{components.path}?{components.query}", headers, timeout)
66     else:
67         response = network.fetch_response(components.netloc, f"{components.path}", headers, timeout)
68
69     # DEBUG: print(f"DEBUG: response[]='{type(response)}' - EXXIT!")
70     return response
71
72 def process_domain(domain: str, blocker: str, command: str) -> bool:
73     # DEBUG: print(f"DEBUG: domain='{domain}',blocker='{blocker}',command='{command}' - CALLED!")
74     if not isinstance(domain, str):
75         raise ValueError(f"Parameter domain[]='{type(domain)}' is not 'str'")
76     elif domain == "":
77         raise ValueError("Parameter 'domain' is empty")
78     elif not isinstance(blocker, str):
79         raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
80     elif blocker == "":
81         raise ValueError("Parameter 'blocker' is empty")
82     elif not isinstance(command, str):
83         raise ValueError(f"Parameter command[]='{type(command)}' is not 'str'")
84     elif command == "":
85         raise ValueError("Parameter 'command' is empty")
86
87     if domain.find("*") > 0:
88         # Try to de-obscure it
89         row = instances.deobscure("*", domain)
90
91         # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
92         if row is None:
93             print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
94             return False
95
96         # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
97         domain = row[0]
98     elif domain.find("?") > 0:
99         # Try to de-obscure it
100         row = instances.deobscure("?", domain)
101
102         # DEBUG: print(f"DEBUG: row[{type(row)}]='{row}'")
103         if row is None:
104             print(f"WARNING: Cannot de-obfucate domain='{domain}' - SKIPPED!")
105             return False
106
107         # DEBUG: print(f"DEBUG: domain='{domain}' de-obscured to '{row[0]}'")
108         domain = row[0]
109
110     if not validators.domain(domain):
111         print(f"WARNING: domain='{domain}' is not a valid domain - SKIPPED!")
112         return False
113     elif domain.endswith(".arpa"):
114         print(f"WARNING: domain='{domain}' is a reversed .arpa domain and should not be used generally.")
115         return False
116     elif blacklist.is_blacklisted(domain):
117         # DEBUG: print(f"DEBUG: domain='{domain}' is blacklisted - SKIPPED!")
118         return False
119     elif instances.is_recent(domain):
120         # DEBUG: print(f"DEBUG: domain='{domain}' has been recently checked - SKIPPED!")
121         return False
122
123     processed = False
124     try:
125         print(f"INFO: Fetching instances for instane='{domain}',blocker='{blocker}',command='{command}' ...")
126         federation.fetch_instances(domain, blocker, None, command)
127         processed = True
128     except network.exceptions as exception:
129         print(f"WARNING: Exception '{type(exception)}' during fetching instances (fetch_oliphant) from domain='{domain}'")
130         instances.set_last_error(domain, exception)
131
132     # DEBUG: print(f"DEBUG: processed='{processed}' - EXIT!")
133     return processed