]> git.mxchange.org Git - fba.git/blob - fba/models/blocks.py
Continued:
[fba.git] / fba / models / blocks.py
1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
16
17 import sys
18 import time
19 import validators
20
21 from fba import blacklist
22 from fba import fba
23 from fba.helpers import tidyup
24
25 def update_reason(reason: str, blocker: str, blocked: str, block_level: str):
26     # DEBUG: print(f"DEBUG: reason='{reason}',blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
27     if not isinstance(reason, str) and reason is not None:
28         raise ValueError(f"Parameter reason[]='{type(reason)}' is not 'str'")
29     elif not isinstance(blocker, str):
30         raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
31     elif blocker == "":
32         raise ValueError("Parameter 'blocker' is empty")
33     elif not isinstance(blocked, str):
34         raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
35     elif blocked == "":
36         raise ValueError("Parameter 'blocked' is empty")
37     elif not isinstance(block_level, str):
38         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not 'str'")
39     elif block_level == "":
40         raise ValueError("Parameter 'block_level' is empty")
41     elif block_level == "accept":
42         raise ValueError("Accepted domains are not wanted here")
43
44     # DEBUG: print("DEBUG: Updating block reason:", reason, blocker, blocked, block_level)
45     fba.cursor.execute(
46         "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND (reason IS NULL OR reason = '') LIMIT 1",
47         [
48             reason,
49             time.time(),
50             blocker,
51             blocked,
52             block_level
53         ])
54
55     # DEBUG: print(f"DEBUG: fba.cursor.rowcount={fba.cursor.rowcount}")
56     if fba.cursor.rowcount == 0:
57         raise Exception(f"Did not update any rows: blocker='{blocker}'")
58
59     # DEBUG: print("DEBUG: EXIT!")
60
61 def update_last_seen(blocker: str, blocked: str, block_level: str):
62     # DEBUG: print("DEBUG: Updating last_seen for:", blocker, blocked, block_level)
63     if not isinstance(blocker, str):
64         raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
65     elif blocker == "":
66         raise ValueError("Parameter 'blocker' is empty")
67     elif not isinstance(blocked, str):
68         raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
69     elif blocked == "":
70         raise ValueError("Parameter 'blocked' is empty")
71     elif not isinstance(block_level, str):
72         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not 'str'")
73     elif block_level == "":
74         raise ValueError("Parameter 'block_level' is empty")
75     elif block_level == "accept":
76         raise ValueError("Accepted domains are not wanted here")
77
78     fba.cursor.execute(
79         "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
80         [
81             time.time(),
82             blocker,
83             blocked,
84             block_level
85         ])
86
87     # DEBUG: print(f"DEBUG: fba.cursor.rowcount={fba.cursor.rowcount}")
88     if fba.cursor.rowcount == 0:
89         raise Exception(f"Did not update any rows: blocker='{blocker}'")
90
91     # DEBUG: print("DEBUG: EXIT!")
92
93 def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool:
94     # DEBUG: print(f"DEBUG: blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
95     if not isinstance(blocker, str):
96         raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not of type 'str'")
97     elif blocker == "":
98         raise ValueError("Parameter 'blocker' is empty")
99     elif not isinstance(blocked, str):
100         raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not of type 'str'")
101     elif blocked == "":
102         raise ValueError("Parameter 'blocked' is empty")
103     elif not isinstance(block_level, str):
104         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
105     elif block_level == "":
106         raise ValueError("Parameter 'block_level' is empty")
107     elif block_level == "accept":
108         raise ValueError("Accepted domains are not wanted here")
109
110     fba.cursor.execute(
111         "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
112         (
113             blocker,
114             blocked,
115             block_level
116         ),
117     )
118
119     is_blocked = fba.cursor.fetchone() is not None
120
121     # DEBUG: print(f"DEBUG: is_blocked='{is_blocked}' - EXIT!")
122     return is_blocked
123
124 def add_instance(blocker: str, blocked: str, reason: str, block_level: str):
125     # DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
126     if not isinstance(blocker, str):
127         raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
128     elif blocker == "":
129         raise ValueError("Parameter 'blocker' is empty")
130     elif not validators.domain(blocker.split("/")[0]):
131         raise ValueError(f"Bad blocker='{blocker}'")
132     elif not isinstance(blocked, str):
133         raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
134     elif blocked == "":
135         raise ValueError("Parameter 'blocked' is empty")
136     elif not isinstance(block_level, str):
137         raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
138     elif block_level == "":
139         raise ValueError("Parameter 'block_level' is empty")
140     elif not validators.domain(blocked.split("/")[0]):
141         raise ValueError(f"Bad blocked='{blocked}'")
142     elif blacklist.is_blacklisted(blocker):
143         raise Exception(f"blocker='{blocker}' is blacklisted but function invoked")
144     elif blacklist.is_blacklisted(blocked):
145         raise Exception(f"blocked='{blocked}' is blacklisted but function invoked")
146     elif block_level == "accept":
147         raise ValueError("Accepted domains are not wanted here")
148
149     if reason is not None:
150         # Maybe needs cleaning
151         reason = tidyup.reason(reason)
152
153     print(f"INFO: New block: blocker='{blocker}',blocked='{blocked}',reason='{reason}',block_level='{block_level}'")
154
155     fba.cursor.execute(
156         "INSERT INTO blocks (blocker, blocked, reason, block_level, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
157         [
158              blocker,
159              blocked,
160              reason,
161              block_level,
162              time.time(),
163              time.time()
164         ])
165
166     # DEBUG: print("DEBUG: EXIT!")