from fba.helpers import config
from fba.helpers import cookies
from fba.helpers import locking
+from fba.helpers import processing
from fba.helpers import software as software_helper
from fba.helpers import tidyup
block["block_level"] = utils.alias_block_level(block["block_level"])
- if utils.process_block(blocker, block["blocked"], block["reason"], block["block_level"]) and block["block_level"] == "reject" and config.get("bot_enabled"):
+ if processing.block(blocker, block["blocked"], block["reason"], block["block_level"]) and block["block_level"] == "reject" and config.get("bot_enabled"):
logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", block["blocked"], block["block_level"], blocker)
blockdict.append({
"blocked": block["blocked"],
continue
logger.info("Adding new block: blocked='%s',block_level='%s'", blocked, block_level)
- if utils.process_block(blocker, blocked, None, block_level) and block_level == "reject" and config.get("bot_enabled"):
+ if processing.block(blocker, blocked, None, block_level) and block_level == "reject" and config.get("bot_enabled"):
logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", blocked, block_level, blocker)
blockdict.append({
"blocked": blocked,
logger.warning("Exception '%s' during fetching instances (fetch_cs) from row[domain]='%s'", type(exception), row["domain"])
instances.set_last_error(row["domain"], exception)
- if utils.process_block(blocker, row["domain"], row["reason"], block_level) and block_level == "reject" and config.get("bot_enabled"):
+ if processing.block(blocker, row["domain"], row["reason"], block_level) and block_level == "reject" and config.get("bot_enabled"):
logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", row["domain"], block_level, blocker)
blockdict.append({
"blocked": row["domain"],
domains.append(domain)
logger.debug("Processing domain='%s' ...", domain)
- processed = utils.process_domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
+ processed = processing.domain(domain, block["blocker"], inspect.currentframe().f_code.co_name)
logger.debug("processed='%s'", processed)
- if utils.process_block(block["blocker"], domain, None, severity) and config.get("bot_enabled"):
+ if processing.block(block["blocker"], domain, None, severity) and config.get("bot_enabled"):
logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", domain, block["block_level"], block["blocker"])
blockdict.append({
"blocked": domain,
})
if reject_media:
- utils.process_block(block["blocker"], domain, None, "reject_media")
+ processing.block(block["blocker"], domain, None, "reject_media")
if reject_reports:
- utils.process_block(block["blocker"], domain, None, "reject_reports")
+ processing.block(block["blocker"], domain, None, "reject_reports")
logger.debug("block[blocker]='%s'", block["blocker"])
if block["blocker"] != "chaos.social":
continue
logger.debug("Processing domain='%s',row[blocker]='%s'", domain, row["blocker"])
- processed = utils.process_domain(domain, row["blocker"], inspect.currentframe().f_code.co_name)
+ processed = processing.domain(domain, row["blocker"], inspect.currentframe().f_code.co_name)
logger.debug("processed='%s'", processed)
if not processed:
continue
logger.info("Proccessing blocked='%s' ...", block["blocked"])
- utils.process_domain(block["blocked"], "climatejustice.social", inspect.currentframe().f_code.co_name)
+ processing.domain(block["blocked"], "climatejustice.social", inspect.currentframe().f_code.co_name)
blockdict = list()
for blocker in domains:
continue
logger.debug("blocked='%s',reason='%s'", block["blocked"], block["reason"])
- if utils.process_block(blocker, block["blocked"], block["reason"], "reject") and config.get("bot_enabled"):
+ if processing.block(blocker, block["blocked"], block["reason"], "reject") and config.get("bot_enabled"):
logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", block["blocked"], block["block_level"], blocker)
blockdict.append({
"blocked": block["blocked"],
block["block_level"] = utils.alias_block_level(block["block_level"])
logger.info("blocked='%s' has been deobfuscated to blocked='%s', adding ...", block["blocked"], blocked)
- if utils.process_block(row["domain"], blocked, block["reason"], block["block_level"]) and block["block_level"] == "reject" and config.get("bot_enabled"):
+ if processing.block(row["domain"], blocked, block["reason"], block["block_level"]) and block["block_level"] == "reject" and config.get("bot_enabled"):
logger.debug("Appending blocked='%s',reason='%s' for blocker='%s' ...", block["blocked"], block["block_level"], row["domain"])
blockdict.append({
"blocked": blocked,
'domain',
'json',
'locking',
+ 'processing',
'software',
'tidyup',
'version',
--- /dev/null
+ # Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import logging
+
+from fba.helpers import domain as domain_helper
+
+from fba.http import federation
+from fba.http import network
+
+from fba.models import blocks
+from fba.models import instances
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+def domain(domain: str, blocker: str, command: str) -> bool:
+ logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
+ domain_helper.raise_on(domain)
+ domain_helper.raise_on(blocker)
+
+ if not isinstance(command, str):
+ raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
+ elif command == "":
+ raise ValueError("Parameter 'command' is empty")
+
+ logger.debug("domain='%s' - BEFORE!", domain)
+ domain = deobfuscate(domain, blocker)
+
+ logger.debug("domain='%s' - DEOBFUSCATED!", domain)
+ if instances.has_pending(blocker):
+ logger.debug("Flushing updates for blocker='%s' ...", blocker)
+ instances.update_data(blocker)
+
+ if not is_domain_wanted(domain):
+ logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
+ return False
+ elif instances.is_recent(domain):
+ logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
+ return False
+
+ processed = False
+ try:
+ logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
+ federation.fetch_instances(domain, blocker, None, command)
+ processed = True
+ except network.exceptions as exception:
+ logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
+ instances.set_last_error(domain, exception)
+
+ logger.debug("Checking if domain='%s' has pending updates ...", domain)
+ if instances.has_pending(domain):
+ logger.debug("Flushing updates for domain='%s' ...", domain)
+ instances.update_data(domain)
+
+ logger.debug("processed='%s' - EXIT!", processed)
+ return processed
+
+def block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
+ logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
+ domain_helper.raise_on(blocker)
+ domain_helper.raise_on(blocked)
+
+ added = False
+ if not isinstance(reason, str) and reason is not None:
+ raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
+ elif not isinstance(block_level, str):
+ raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
+ elif block_level == "":
+ raise ValueError("Parameter block_level is empty")
+
+ if not blocks.is_instance_blocked(blocker, blocked, block_level):
+ logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
+ blocks.add_instance(blocker, blocked, reason, block_level)
+ added = True
+ else:
+ logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
+ blocks.update_last_seen(blocker, blocked, block_level)
+
+ logger.debug("added='%s' - EXIT!", added)
+ return added
import logging
-import json
import reqto
import requests
import urllib3
from fba.helpers import domain as domain_helper
from fba.helpers import tidyup
-from fba.http import federation
from fba.http import network
-from fba.models import blocks
from fba.models import instances
logging.basicConfig(level=logging.INFO)
logger.debug("response[]='%s' - EXIT!", type(response))
return response
-def process_domain(domain: str, blocker: str, command: str) -> bool:
- logger.debug("domain='%s',blocker='%s',command='%s' - CALLED!", domain, blocker, command)
- domain_helper.raise_on(domain)
- domain_helper.raise_on(blocker)
-
- if not isinstance(command, str):
- raise ValueError(f"Parameter command[]='{type(command)}' is not of type 'str'")
- elif command == "":
- raise ValueError("Parameter 'command' is empty")
-
- logger.debug("domain='%s' - BEFORE!", domain)
- domain = deobfuscate(domain, blocker)
-
- logger.debug("domain='%s' - DEOBFUSCATED!", domain)
- if instances.has_pending(blocker):
- logger.debug("Flushing updates for blocker='%s' ...", blocker)
- instances.update_data(blocker)
-
- if not is_domain_wanted(domain):
- logger.debug("domain='%s' is not wanted - SKIPPED!", domain)
- return False
- elif instances.is_recent(domain):
- logger.debug("domain='%s' has been recently checked - SKIPPED!", domain)
- return False
-
- processed = False
- try:
- logger.info("Fetching instances for domain='%s',blocker='%s',command='%s' ...", domain, blocker, command)
- federation.fetch_instances(domain, blocker, None, command)
- processed = True
- except network.exceptions as exception:
- logger.warning("Exception '%s' during fetching instances (%s) from domain='%s'", type(exception), command, domain)
- instances.set_last_error(domain, exception)
-
- logger.debug("Checking if domain='%s' has pending updates ...", domain)
- if instances.has_pending(domain):
- logger.debug("Flushing updates for domain='%s' ...", domain)
- instances.update_data(domain)
-
- logger.debug("processed='%s' - EXIT!", processed)
- return processed
-
def find_domains(tags: bs4.element.ResultSet, search: str) -> list:
logger.debug("tags[%s]()=%d,search='%s' - CALLED!", type(tags), len(tags), search)
if not isinstance(tags, bs4.element.ResultSet):
logger.debug("domain='%s' - EXIT!", domain)
return domain
-def process_block(blocker: str, blocked: str, reason: str, block_level: str) -> bool:
- logger.debug("blocker='%s',blocked='%s',reason='%s',block_level='%s' - CALLED!", blocker, blocked, reason, block_level)
- domain_helper.raise_on(blocker)
- domain_helper.raise_on(blocked)
-
- added = False
- if not isinstance(reason, str) and reason is not None:
- raise ValueError(f"Parameter reason[]='{type(reason)}' is not of type 'str'")
- elif not isinstance(block_level, str):
- raise ValueError(f"Parameter block_level[]='{type(block_level)}' is not of type 'str'")
- elif block_level == "":
- raise ValueError("Parameter block_level is empty")
-
- if not blocks.is_instance_blocked(blocker, blocked, block_level):
- logger.debug("Invoking blocks.add_instance(%s, %s, %s, %s) ...", blocker, blocked, reason, block_level)
- blocks.add_instance(blocker, blocked, reason, block_level)
- added = True
- else:
- logger.debug("Updating block last seen and reason for blocker='%s',blocked='%s' ...", blocker, blocked)
- blocks.update_last_seen(blocker, blocked, block_level)
-
- logger.debug("added='%s' - EXIT!", added)
- return added
-
def alias_block_level(block_level: str) -> str:
logger.debug("block_level='%s' - CALLED!", block_level)
if not isinstance(block_level, str):