1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 from fba import database
23 from fba.helpers import blacklist
24 from fba.helpers import domain as domain_helper
25 from fba.helpers import tidyup
27 logging.basicConfig(level=logging.INFO)
28 logger = logging.getLogger(__name__)
30 def update_reason(reason: str, blocker: str, blocked: str, block_level: str):
31 logger.debug("reason='%s',blocker='%s',blocked='%s',block_level='%s' - CALLED!", reason, blocker, blocked, block_level)
32 if not isinstance(reason, str) and reason is not None:
33 raise ValueError(f"Parameter reason[]='{type(reason)}' is not 'str'")
34 elif not isinstance(blocker, str):
35 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
37 raise ValueError("Parameter 'blocker' is empty")
38 elif blocker.lower() != blocker:
39 raise ValueError(f"Parameter blocker='{blocker}' must be all lower-case")
40 elif not isinstance(blocked, str):
41 raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
43 raise ValueError("Parameter 'blocked' is empty")
44 elif blocked.lower() != blocked:
45 raise ValueError(f"Parameter blocked='{blocked}' must be all lower-case")
46 elif not isinstance(block_level, str):
47 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not 'str'")
48 elif block_level == "":
49 raise ValueError("Parameter 'block_level' is empty")
50 elif block_level == "accept":
51 raise ValueError("Accepted domains are not wanted here")
53 logger.debug("Updating block reason='%s',blocker='%s',blocked='%s',block_level='%s'", reason, blocker, blocked, block_level)
54 database.cursor.execute(
55 "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND (reason IS NULL OR reason = '') LIMIT 1",
66 def update_last_seen(blocker: str, blocked: str, block_level: str):
67 logger.debug("blocker='%s',blocked='%s',block_level='%s' - CALLED!", blocker, blocked, block_level)
68 if not isinstance(blocker, str):
69 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
71 raise ValueError("Parameter 'blocker' is empty")
72 elif blocker.lower() != blocker:
73 raise ValueError(f"Parameter blocker='{blocker}' must be all lower-case")
74 elif not isinstance(blocked, str):
75 raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
77 raise ValueError("Parameter 'blocked' is empty")
78 elif blocked.lower() != blocked:
79 raise ValueError(f"Parameter blocked='{blocked}' must be all lower-case")
80 elif not isinstance(block_level, str):
81 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not 'str'")
82 elif block_level == "":
83 raise ValueError("Parameter 'block_level' is empty")
84 elif block_level == "accept":
85 raise ValueError("Accepted domains are not wanted here")
87 database.cursor.execute(
88 "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
98 def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool:
99 logger.debug("blocker='%s',blocked='%s',block_level='%s' - CALLED!", blocker, blocked, block_level)
100 if not isinstance(blocker, str):
101 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not of type 'str'")
103 raise ValueError("Parameter 'blocker' is empty")
104 elif blocker.lower() != blocker:
105 raise ValueError(f"Parameter blocker='{blocker}' must be all lower-case")
106 elif not isinstance(blocked, str):
107 raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not of type 'str'")
109 raise ValueError("Parameter 'blocked' is empty")
110 elif blocked.lower() != blocked:
111 raise ValueError(f"Parameter blocked='{blocked}' must be all lower-case")
112 elif not isinstance(block_level, str):
113 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
114 elif block_level == "":
115 raise ValueError("Parameter 'block_level' is empty")
116 elif block_level == "accept":
117 raise ValueError("Accepted domains are not wanted here")
119 database.cursor.execute(
120 "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
128 is_blocked = database.cursor.fetchone() is not None
130 logger.debug("is_blocked='%s' - EXIT!", is_blocked)
133 def add_instance(blocker: str, blocked: str, reason: str, block_level: str):
134 logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
135 domain_helper.raise_on(blocker)
136 domain_helper.raise_on(blocked)
138 if not isinstance(block_level, str):
139 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
140 elif block_level == "":
141 raise ValueError("Parameter 'block_level' is empty")
142 elif blacklist.is_blacklisted(blocker):
143 raise Exception(f"blocker='{blocker}' is blacklisted but function invoked")
144 elif blacklist.is_blacklisted(blocked):
145 raise Exception(f"blocked='{blocked}' is blacklisted but function invoked")
146 elif block_level == "accept":
147 raise ValueError("Accepted domains are not wanted here")
149 if reason is not None:
150 # Maybe needs cleaning
151 reason = tidyup.reason(reason)
153 logger.info("New block: blocker='%s',blocked='%s',reason='%s',block_level='%s'", blocker, blocked, reason, block_level)
155 database.cursor.execute(
156 "INSERT INTO blocks (blocker, blocked, reason, block_level, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?)",
166 logger.debug("EXIT!")