1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
20 from fba.helpers import blacklist
21 from fba.helpers import domain as domain_helper
23 from fba.http import federation
24 from fba.http import network
26 from fba.models import blocks
27 from fba.models import instances
29 logging.basicConfig(level=logging.INFO)
30 logger = logging.getLogger(__name__)
32 def domain(name: str, blocker: str, command: str) -> bool:
33 logger.debug("name='%s',blocker='%s',command='%s' - CALLED!", name, blocker, command)
34 domain_helper.raise_on(name)
35 domain_helper.raise_on(blocker)
37 if not isinstance(command, str):
38 raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
40 raise ValueError("Parameter 'command' is empty")
42 logger.debug("name='%s' - BEFORE!", name)
43 name = utils.deobfuscate(name, blocker)
45 logger.debug("name='%s' - DEOBFUSCATED!", name)
46 if instances.has_pending(blocker):
47 logger.debug("Flushing updates for blocker='%s' ...", blocker)
48 instances.update(blocker)
50 if not domain_helper.is_wanted(name):
51 logger.debug("name='%s' is not wanted - SKIPPED!", name)
53 elif instances.is_recent(name):
54 logger.debug("name='%s' has been recently checked - SKIPPED!", name)
59 logger.info("Fetching instances for name='%s',blocker='%s',command='%s' ...", name, blocker, command)
60 federation.fetch_instances(name, blocker, None, command)
62 except network.exceptions as exception:
63 logger.warning("Exception '%s' during fetching instances (%s) from name='%s'", type(exception), command, name)
64 instances.set_last_error(name, exception)
66 logger.debug("Checking if name='%s' has pending updates ...", name)
67 if instances.has_pending(name):
68 logger.debug("Flushing updates for name='%s' ...", name)
69 instances.update(name)
71 logger.debug("processed='%s' - EXIT!", processed)
74 def block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
75 logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
76 domain_helper.raise_on(blocker)
77 domain_helper.raise_on(blocked)
80 if not isinstance(reason, str) and reason is not None:
81 raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
82 elif not isinstance(block_level, str):
83 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
84 elif block_level == "":
85 raise ValueError("Parameter block_level is empty")
86 elif blacklist.is_blacklisted(blocked):
87 raise ValueError(f"blocked='{blocked}' is blacklisted but function was invoked")
89 if not blocks.is_instance_blocked(blocker, blocked, block_level):
90 logger.debug("Invoking blocks.add(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
91 blocks.add(blocker, blocked, reason, block_level)
94 logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
95 blocks.update_last_seen(blocker, blocked, block_level)
97 logger.debug("added='%s' - EXIT!", added)