1 # Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
2 # Copyright (C) 2023 Free Software Foundation
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published
6 # by the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 from fba import blacklist
23 from fba.helpers import tidyup
25 def update_reason(reason: str, blocker: str, blocked: str, block_level: str):
26 # DEBUG: print(f"DEBUG: reason='{reason}',blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
27 if not isinstance(reason, str) and reason is not None:
28 raise ValueError(f"Parameter reason[]='{type(reason)}' is not 'str'")
29 elif not isinstance(blocker, str):
30 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
32 raise ValueError("Parameter 'blocker' is empty")
33 elif not isinstance(blocked, str):
34 raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
36 raise ValueError("Parameter 'blocked' is empty")
37 elif not isinstance(block_level, str):
38 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not 'str'")
39 elif block_level == "":
40 raise ValueError("Parameter 'block_level' is empty")
42 # DEBUG: print("DEBUG: Updating block reason:", reason, blocker, blocked, block_level)
45 "UPDATE blocks SET reason = ?, last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? AND reason IN ('','unknown') LIMIT 1",
55 # DEBUG: print(f"DEBUG: fba.cursor.rowcount={fba.cursor.rowcount}")
56 if fba.cursor.rowcount == 0:
57 # DEBUG: print(f"DEBUG: Did not update any rows: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',reason='{reason}' - EXIT!")
60 except BaseException as exception:
61 print(f"ERROR: failed SQL query: reason='{reason}',blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',exception[{type(exception)}]:'{str(exception)}'")
64 # DEBUG: print("DEBUG: EXIT!")
66 def update_last_seen(blocker: str, blocked: str, block_level: str):
67 # DEBUG: print("DEBUG: Updating last_seen for:", blocker, blocked, block_level)
68 if not isinstance(blocker, str):
69 raise ValueError(f"Parameter blocker[]='{type(blocker)}' is not 'str'")
71 raise ValueError("Parameter 'blocker' is empty")
72 elif not isinstance(blocked, str):
73 raise ValueError(f"Parameter blocked[]='{type(blocked)}' is not 'str'")
75 raise ValueError("Parameter 'blocked' is empty")
76 elif not isinstance(block_level, str):
77 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not 'str'")
78 elif block_level == "":
79 raise ValueError("Parameter 'block_level' is empty")
83 "UPDATE blocks SET last_seen = ? WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
92 # DEBUG: print(f"DEBUG: fba.cursor.rowcount={fba.cursor.rowcount}")
93 if fba.cursor.rowcount == 0:
94 # DEBUG: print(f"DEBUG: Did not update any rows: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}' - EXIT!")
97 except BaseException as exception:
98 print(f"ERROR: failed SQL query: blocker='{blocker}',blocked='{blocked}',block_level='{block_level}',exception[{type(exception)}]:'{str(exception)}'")
101 # DEBUG: print("DEBUG: EXIT!")
103 def is_instance_blocked(blocker: str, blocked: str, block_level: str) -> bool:
104 # DEBUG: print(f"DEBUG: blocker={blocker},blocked={blocked},block_level={block_level} - CALLED!")
105 if not isinstance(blocker, str):
106 raise ValueError(f"Parameter blocker[]={type(blocker)} is not of type 'str'")
108 raise ValueError("Parameter 'blocker' is empty")
109 elif not isinstance(blocked, str):
110 raise ValueError(f"Parameter blocked[]={type(blocked)} is not of type 'str'")
112 raise ValueError("Parameter 'blocked' is empty")
113 elif not isinstance(block_level, str):
114 raise ValueError(f"Parameter block_level[]={type(block_level)} is not of type 'str'")
115 elif block_level == "":
116 raise ValueError("Parameter 'block_level' is empty")
119 "SELECT * FROM blocks WHERE blocker = ? AND blocked = ? AND block_level = ? LIMIT 1",
127 is_blocked = fba.cursor.fetchone() is not None
129 # DEBUG: print(f"DEBUG: is_blocked='{is_blocked}' - EXIT!")
132 def add_instance(blocker: str, blocked: str, reason: str, block_level: str):
133 # DEBUG: print("DEBUG: blocker,blocked,reason,block_level:", blocker, blocked, reason, block_level)
134 if not isinstance(blocker, str):
135 raise ValueError(f"Parameter blocker[]={type(blocker)} is not 'str'")
137 raise ValueError("Parameter 'blocker' is empty")
138 elif not validators.domain(blocker.split("/")[0]):
139 raise ValueError(f"Bad blocker='{blocker}'")
140 elif not isinstance(blocked, str):
141 raise ValueError(f"Parameter blocked[]={type(blocked)} is not 'str'")
143 raise ValueError("Parameter 'blocked' is empty")
144 elif not validators.domain(blocked.split("/")[0]):
145 raise ValueError(f"Bad blocked='{blocked}'")
146 elif blacklist.is_blacklisted(blocker):
147 raise Exception(f"blocker='{blocker}' is blacklisted but function invoked")
148 elif blacklist.is_blacklisted(blocked):
149 raise Exception(f"blocked='{blocked}' is blacklisted but function invoked")
151 if reason is not None:
152 # Maybe needs cleaning
153 reason = tidyup.reason(reason)
155 print(f"INFO: New block: blocker='{blocker}',blocked='{blocked}', reason='{reason}', block_level='{block_level}'")
158 "INSERT INTO blocks (blocker, blocked, reason, block_level, first_seen, last_seen) VALUES(?, ?, ?, ?, ?, ?)",
168 except BaseException as exception:
169 print(f"ERROR: failed SQL query: blocker='{blocker}',blocked='{blocked}',reason='{reason}',block_level='{block_level}',exception[{type(exception)}]:'{str(exception)}'")
172 # DEBUG: print("DEBUG: EXIT!")