from fba.models import blocks
from fba.models import instances
+from fba.models import obfuscation
from fba.models import sources
from fba.networks import friendica
logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning("Cannot deobfuscate block[blocked]='%s',blocker='%s',software='%s' - SKIPPED!", block["blocked"], blocker, software)
+
+ if not obfuscation.is_added(block["blocked"]):
+ logger.debug("Invoking add(%s)", block["blocked"])
+ obfuscation.add(block["blocked"])
+
continue
deobfuscated = deobfuscated + 1
logger.debug("row[]='%s'", type(row))
if row is None:
logger.warning("Cannot deobfuscate block[blocked]='%s',blocker='%s',software='%s' - SKIPPED!", block["blocked"], blocker, software)
+
+ if not obfuscation.is_added(block["blocked"]):
+ logger.debug("Invoking add(%s)", block["blocked"])
+ obfuscation.add(block["blocked"])
+
continue
deobfuscated = deobfuscated + 1
logger.debug("blocked[%s]='%s',block[blocked]='%s'", type(blocked), blocked, block["blocked"])
if blocked is not None and blocked != block["blocked"]:
- logger.debug("blocked='%s' was deobfuscated to blocked='%s'", block["blocked"], blocked)
+ logger.debug("block[blocked]='%s' was deobfuscated to blocked='%s'", block["blocked"], blocked)
if blacklist.is_blacklisted(blocked):
logger.debug("blocked='%s' is blacklisted - SKIPPED!", blocked)
elif blocks.is_instance_blocked(row["domain"], blocked):
logger.debug("blocked='%s' is already blocked by domain='%s' - SKIPPED!", blocked, row["domain"])
continue
+ elif obfuscation.is_added(block["blocked"]):
+ logger.debug("Deleting deobfuscated pattern block[blocked]='%s' ...", block["blocked"])
+ obfuscation.delete(block["blocked"])
logger.debug("block[block_level]='%s' - BEFORE!", block["block_level"])
block["block_level"] = blocks.alias_block_level(block["block_level"])
--- /dev/null
+# Fedi API Block - An aggregator for fetching blocking data from fediverse nodes
+# Copyright (C) 2023 Free Software Foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+import logging
+
+import time
+
+from fba import database
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+def is_added(pattern: str) -> bool:
+ logger.debug("pattern='%s' - CALLED!", pattern)
+ if not isinstance(pattern, str):
+ raise ValueError(f"pattern[]='{type(pattern)}' is not of type 'str'")
+ elif pattern == "":
+ raise ValueError("Parametern 'pattern' is an empty string")
+
+ database.cursor.execute("SELECT * FROM obfuscation WHERE pattern=? LIMIT 1", [pattern])
+
+ row = database.cursor.fetchone()
+ logger.debug("row[]='%s'", row)
+
+ added = (row is not None)
+
+ logger.debug("added='%s' - EXIT!", added)
+ return added
+
+def add(pattern: str) -> None:
+ logger.debug("pattern='%s' - CALLED!", pattern)
+ if not isinstance(pattern, str):
+ raise ValueError(f"pattern[]='{type(pattern)}' is not of type 'str'")
+ elif pattern == "":
+ raise ValueError("Parametern 'pattern' is an empty string")
+ elif is_added(pattern):
+ raise Exception(f"pattern='{pattern}' is already added but function was invoked")
+
+ database.cursor.execute("INSERT INTO obfuscation (pattern, added) VALUES (?, ?)", (
+ pattern,
+ time.time()
+ ))
+
+ logger.debug("EXIT!")
+
+def delete (pattern: str) -> None:
+ logger.debug("pattern='%s' - CALLED!", pattern)
+ if not isinstance(pattern, str):
+ raise ValueError(f"pattern[]='{type(pattern)}' is not of type 'str'")
+ elif pattern == "":
+ raise ValueError("Parametern 'pattern' is an empty string")
+ elif not is_added(pattern):
+ raise Exception(f"pattern='{pattern}' is not added but function was invoked")
+
+ database.cursor.execute("DELETE FROM obfuscation WHERE pattern=? LIMIT 1", [pattern])
+
+ logger.debug("EXIT!")