1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
22 from fba import database
24 from fba.helpers import blacklist
25 from fba.helpers import tidyup
27 logging.basicConfig(level=logging.INFO)
28 logger = logging.getLogger(__name__)
30 def update_reason(reason: str, blocker: str, blocked: str, block_level: str):
31 logger.debug(f"reason='{reason}',blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
32 if not isinstance(reason, str) and reason is not None:
33 raise ValueError(f"Parameter reason[]='{type(reason)}' is not 'str'")
34 elif not isinstance(blocker, str):
35 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
37 raise ValueError("Parameter 'blocker' is empty")
38 elif blocker.lower() != blocker:
39 raise ValueError(f"Parameter blocker='{blocker}' must be all lower-case")
40 elif not isinstance(blocked, str):
41 raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
43 raise ValueError("Parameter 'blocked' is empty")
44 elif blocked.lower() != blocked:
45 raise ValueError(f"Parameter blocked='{blocked}' must be all lower-case")
46 elif not isinstance(block_level, str):
47 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not 'str'")
48 elif block_level == "":
49 raise ValueError("Parameter 'block_level' is empty")
50 elif block_level == "accept":
51 raise ValueError("Accepted domains are not wanted here")
53 logger.debug("Updating block reason:", reason, blocker, blocked, block_level)
54 database.cursor.execute(
55 "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND (reason IS NULL OR reason = '') LIMIT 1",
66 def update_last_seen(blocker: str, blocked: str, block_level: str):
67 logger.debug("Updating last_seen for:", blocker, blocked, block_level)
68 if not isinstance(blocker, str):
69 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
71 raise ValueError("Parameter 'blocker' is empty")
72 elif blocker.lower() != blocker:
73 raise ValueError(f"Parameter blocker='{blocker}' must be all lower-case")
74 elif not isinstance(blocked, str):
75 raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
77 raise ValueError("Parameter 'blocked' is empty")
78 elif blocked.lower() != blocked:
79 raise ValueError(f"Parameter blocked='{blocked}' must be all lower-case")
80 elif not isinstance(block_level, str):
81 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not 'str'")
82 elif block_level == "":
83 raise ValueError("Parameter 'block_level' is empty")
84 elif block_level == "accept":
85 raise ValueError("Accepted domains are not wanted here")
87 database.cursor.execute(
88 "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
98 def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool:
99 logger.debug(f"blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
100 if not isinstance(blocker, str):
101 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not of type 'str'")
103 raise ValueError("Parameter 'blocker' is empty")
104 elif blocker.lower() != blocker:
105 raise ValueError(f"Parameter blocker='{blocker}' must be all lower-case")
106 elif not isinstance(blocked, str):
107 raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not of type 'str'")
109 raise ValueError("Parameter 'blocked' is empty")
110 elif blocked.lower() != blocked:
111 raise ValueError(f"Parameter blocked='{blocked}' must be all lower-case")
112 elif not isinstance(block_level, str):
113 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
114 elif block_level == "":
115 raise ValueError("Parameter 'block_level' is empty")
116 elif block_level == "accept":
117 raise ValueError("Accepted domains are not wanted here")
119 database.cursor.execute(
120 "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
128 is_blocked = database.cursor.fetchone() is not None
130 logger.debug(f"is_blocked='{is_blocked}' - EXIT!")
133 def add_instance(blocker: str, blocked: str, reason: str, block_level: str):
134 logger.debug("blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
135 if not isinstance(blocker, str):
136 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
138 raise ValueError("Parameter 'blocker' is empty")
139 elif blocker.lower() != blocker:
140 raise ValueError(f"Parameter blocker='{blocker}' must be all lower-case")
141 elif not validators.domain(blocker.split("/")[0]):
142 raise ValueError(f"Bad blocker='{blocker}'")
143 elif not isinstance(blocked, str):
144 raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
146 raise ValueError("Parameter 'blocked' is empty")
147 elif blocked.lower() != blocked:
148 raise ValueError(f"Parameter blocked='{blocked}' must be all lower-case")
149 elif not isinstance(block_level, str):
150 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
151 elif block_level == "":
152 raise ValueError("Parameter 'block_level' is empty")
153 elif not validators.domain(blocked.split("/")[0]):
154 raise ValueError(f"Bad blocked='{blocked}'")
155 elif blacklist.is_blacklisted(blocker):
156 raise Exception(f"blocker='{blocker}' is blacklisted but function invoked")
157 elif blacklist.is_blacklisted(blocked):
158 raise Exception(f"blocked='{blocked}' is blacklisted but function invoked")
159 elif block_level == "accept":
160 raise ValueError("Accepted domains are not wanted here")
162 if reason is not None:
163 # Maybe needs cleaning
164 reason = tidyup.reason(reason)
166 logger.info("New block: blocker='%s',blocked='%s',reason='%s',block_level='%s'", blocker, blocked, reason, block_level)
168 database.cursor.execute(
169 "INSERT INTO blocks (blocker, blocked, reason, block_level, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
179 logger.debug("EXIT!")