1 # Copyright (C) 2023 Free Software Foundation
3 # This program is free software: you can redistribute it and/or modify
4 # it under the terms of the GNU Affero General Public License as published
5 # by the Free Software Foundation, either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Affero General Public License for more details.
13 # You should have received a copy of the GNU Affero General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
18 from fba.helpers import domain as domain_helper
20 from fba.http import federation
21 from fba.http import network
23 from fba.models import blocks
24 from fba.models import instances
26 logging.basicConfig(level=logging.INFO)
27 logger = logging.getLogger(__name__)
29 def domain(domain: str, blocker: str, command: str) -> bool:
30 logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
31 domain_helper.raise_on(domain)
32 domain_helper.raise_on(blocker)
34 if not isinstance(command, str):
35 raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
37 raise ValueError("Parameter 'command' is empty")
39 logger.debug("domain='%s' - BEFORE!", domain)
40 domain = deobfuscate(domain, blocker)
42 logger.debug("domain='%s' - DEOBFUSCATED!", domain)
43 if instances.has_pending(blocker):
44 logger.debug("Flushing updates for blocker='%s' ...", blocker)
45 instances.update_data(blocker)
47 if not is_domain_wanted(domain):
48 logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
50 elif instances.is_recent(domain):
51 logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
56 logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
57 federation.fetch_instances(domain, blocker, None, command)
59 except network.exceptions as exception:
60 logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
61 instances.set_last_error(domain, exception)
63 logger.debug("Checking if domain='%s' has pending updates ...", domain)
64 if instances.has_pending(domain):
65 logger.debug("Flushing updates for domain='%s' ...", domain)
66 instances.update_data(domain)
68 logger.debug("processed='%s' - EXIT!", processed)
71 def block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
72 logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
73 domain_helper.raise_on(blocker)
74 domain_helper.raise_on(blocked)
77 if not isinstance(reason, str) and reason is not None:
78 raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
79 elif not isinstance(block_level, str):
80 raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
81 elif block_level == "":
82 raise ValueError("Parameter block_level is empty")
84 if not blocks.is_instance_blocked(blocker, blocked, block_level):
85 logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
86 blocks.add_instance(blocker, blocked, reason, block_level)
89 logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
90 blocks.update_last_seen(blocker, blocked, block_level)
92 logger.debug("added='%s' - EXIT!", added)