1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
20 from fba.helpers import domain as domain_helper
22 from fba.http import federation
23 from fba.http import network
25 from fba.models import blocks
26 from fba.models import instances
28 logging.basicConfig(level=logging.INFO)
29 logger = logging.getLogger(__name__)
31 def domain(name: str, blocker: str, command: str) -> bool:
32 logger.debug("name='%s',blocker='%s',command='%s' - CALLED!", name, blocker, command)
33 domain_helper.raise_on(name)
34 domain_helper.raise_on(blocker)
36 if not isinstance(command, str):
37 raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
39 raise ValueError("Parameter 'command' is empty")
41 logger.debug("name='%s' - BEFORE!", name)
42 name = utils.deobfuscate(name, blocker)
44 logger.debug("name='%s' - DEOBFUSCATED!", name)
45 if instances.has_pending(blocker):
46 logger.debug("Flushing updates for blocker='%s' ...", blocker)
47 instances.update_data(blocker)
49 if not utils.is_domain_wanted(name):
50 logger.debug("name='%s' is not wanted - SKIPPED!", name)
52 elif instances.is_recent(name):
53 logger.debug("name='%s' has been recently checked - SKIPPED!", name)
58 logger.info("Fetching instances for name='%s',blocker='%s',command='%s' ...", name, blocker, command)
59 federation.fetch_instances(name, blocker, None, command)
61 except network.exceptions as exception:
62 logger.warning("Exception '%s' during fetching instances (%s) from name='%s'", type(exception), command, name)
63 instances.set_last_error(name, exception)
65 logger.debug("Checking if name='%s' has pending updates ...", name)
66 if instances.has_pending(name):
67 logger.debug("Flushing updates for name='%s' ...", name)
68 instances.update_data(name)
70 logger.debug("processed='%s' - EXIT!", processed)
73 def block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
74 logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
75 domain_helper.raise_on(blocker)
76 domain_helper.raise_on(blocked)
79 if not isinstance(reason, str) and reason is not None:
80 raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
81 elif not isinstance(block_level, str):
82 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
83 elif block_level == "":
84 raise ValueError("Parameter block_level is empty")
86 if not blocks.is_instance_blocked(blocker, blocked, block_level):
87 logger.debug("Invoking blocks.add(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
88 blocks.add(blocker, blocked, reason, block_level)
91 logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
92 blocks.update_last_seen(blocker, blocked, block_level)
94 logger.debug("added='%s' - EXIT!", added)