From 87a945b2956b793cc8d287c3a2fd4b2b6fab68cb Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 6 Aug 2022 17:06:55 +0000 Subject: [PATCH] More prevention of double processing of the same content --- database.sql | 20 +++++++- doc/database.md | 2 + doc/database/db_arrived-activity.md | 22 +++++++++ doc/database/db_processed-activity.md | 22 +++++++++ src/Module/Debug/ActivityPubConversion.php | 7 +-- src/Protocol/ActivityPub/Processor.php | 55 ++++++---------------- src/Protocol/ActivityPub/Receiver.php | 41 ++++++++++++++-- static/dbstructure.config.php | 24 +++++++++- 8 files changed, 141 insertions(+), 52 deletions(-) create mode 100644 doc/database/db_arrived-activity.md create mode 100644 doc/database/db_processed-activity.md diff --git a/database.sql b/database.sql index 7663b71590..6126e58a87 100644 --- a/database.sql +++ b/database.sql @@ -1,6 +1,6 @@ -- ------------------------------------------ -- Friendica 2022.09-dev (Giant Rhubarb) --- DB_UPDATE_VERSION 1477 +-- DB_UPDATE_VERSION 1478 -- ------------------------------------------ @@ -1716,6 +1716,24 @@ CREATE TABLE IF NOT EXISTS `user-contact` ( FOREIGN KEY (`uri-id`) REFERENCES `item-uri` (`id`) ON UPDATE RESTRICT ON DELETE CASCADE ) DEFAULT COLLATE utf8mb4_general_ci COMMENT='User specific public contact data'; +-- +-- TABLE arrived-activity +-- +CREATE TABLE IF NOT EXISTS `arrived-activity` ( + `object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity', + `received` datetime COMMENT 'Receiving date', + PRIMARY KEY(`object-id`) +) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities'; + +-- +-- TABLE processed-activity +-- +CREATE TABLE IF NOT EXISTS `processed-activity` ( + `object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity', + `received` datetime COMMENT 'Receiving date', + PRIMARY KEY(`object-id`) +) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of processed activities'; + -- -- TABLE worker-ipc -- diff --git a/doc/database.md b/doc/database.md index 2bd2fa5f49..fe505ce18b 100644 --- a/doc/database.md +++ b/doc/database.md @@ -13,6 +13,7 @@ Database Tables | [application](help/database/db_application) | OAuth application | | [application-marker](help/database/db_application-marker) | Timeline marker | | [application-token](help/database/db_application-token) | OAuth user token | +| [arrived-activity](help/database/db_arrived-activity) | Id of arrived activities | | [attach](help/database/db_attach) | file attachments | | [cache](help/database/db_cache) | Stores temporary data | | [config](help/database/db_config) | main configuration storage | @@ -67,6 +68,7 @@ Database Tables | [post-user](help/database/db_post-user) | User specific post data | | [post-user-notification](help/database/db_post-user-notification) | User post notifications | | [process](help/database/db_process) | Currently running system processes | +| [processed-activity](help/database/db_processed-activity) | Id of processed activities | | [profile](help/database/db_profile) | user profiles data | | [profile_field](help/database/db_profile_field) | Custom profile fields | | [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers | diff --git a/doc/database/db_arrived-activity.md b/doc/database/db_arrived-activity.md new file mode 100644 index 0000000000..1587c6e312 --- /dev/null +++ b/doc/database/db_arrived-activity.md @@ -0,0 +1,22 @@ +Table arrived-activity +=========== + +Id of arrived activities + +Fields +------ + +| Field | Description | Type | Null | Key | Default | Extra | +| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- | +| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | | +| received | Receiving date | datetime | YES | | NULL | | + +Indexes +------------ + +| Name | Fields | +| ------- | --------- | +| PRIMARY | object-id | + + +Return to [database documentation](help/database) diff --git a/doc/database/db_processed-activity.md b/doc/database/db_processed-activity.md new file mode 100644 index 0000000000..618b46f972 --- /dev/null +++ b/doc/database/db_processed-activity.md @@ -0,0 +1,22 @@ +Table processed-activity +=========== + +Id of processed activities + +Fields +------ + +| Field | Description | Type | Null | Key | Default | Extra | +| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- | +| object-id | object id of the incoming activity | varbinary(255) | NO | PRI | NULL | | +| received | Receiving date | datetime | YES | | NULL | | + +Indexes +------------ + +| Name | Fields | +| ------- | --------- | +| PRIMARY | object-id | + + +Return to [database documentation](help/database) diff --git a/src/Module/Debug/ActivityPubConversion.php b/src/Module/Debug/ActivityPubConversion.php index ec7fee3f46..8593de81c0 100644 --- a/src/Module/Debug/ActivityPubConversion.php +++ b/src/Module/Debug/ActivityPubConversion.php @@ -22,15 +22,10 @@ namespace Friendica\Module\Debug; use Friendica\BaseModule; -use Friendica\Content\Text; -use Friendica\Core\Logger; use Friendica\Core\Renderer; use Friendica\DI; -use Friendica\Model\Item; -use Friendica\Model\Tag; use Friendica\Protocol\ActivityPub; use Friendica\Util\JsonLD; -use Friendica\Util\XML; class ActivityPubConversion extends BaseModule { @@ -123,7 +118,7 @@ class ActivityPubConversion extends BaseModule 'content' => visible_whitespace(var_export($object_data, true)) ]; - $item = ActivityPub\Processor::createItem($object_data); + $item = ActivityPub\Processor::createItem($object_data, true); $results[] = [ 'title' => DI::l10n()->t('Result Item'), diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index d345ba78cf..a8d51da43a 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -60,33 +60,29 @@ class Processor const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:'; const CACHEKEY_JUST_FETCHED = 'processor:isJustFetched:'; - static $processed = []; - /** - * Add an activity id to the list of processed ids + * Add an object id to the list of processed ids * * @param string $id * * @return void */ - public static function addActivityId(string $id) + private static function addActivityId(string $id) { - self::$processed[] = $id; - if (count(self::$processed) > 100) { - self::$processed = array_slice(self::$processed, 1); - } + DBA::delete('processed-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]); + DBA::insert('processed-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]); } /** - * Checks if the given has just been processed + * Checks if the given object id has just been processed * * @param string $id * * @return boolean */ - public static function isProcessed(string $id): bool + private static function isProcessed(string $id): bool { - return in_array($id, self::$processed); + return DBA::exists('processed-activity', ['object-id' => $id]); } /** @@ -233,7 +229,7 @@ class Processor $item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]); if (!DBA::isResult($item)) { Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]); - $item = self::createItem($activity); + $item = self::createItem($activity, false); if (empty($item)) { return; } @@ -303,7 +299,7 @@ class Processor * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @throws \ImagickException */ - public static function createItem(array $activity, bool $fetch_parents = true): array + public static function createItem(array $activity, bool $fetch_parents): array { if (self::isProcessed($activity['id'])) { Logger::info('Id is already processed', ['id' => $activity['id']]); @@ -324,10 +320,10 @@ class Processor $item['object-type'] = Activity\ObjectType::COMMENT; } - if (!empty($activity['context'])) { - $item['conversation'] = $activity['context']; - } elseif (!empty($activity['conversation'])) { + if (!empty($activity['conversation'])) { $item['conversation'] = $activity['conversation']; + } elseif (!empty($activity['context'])) { + $item['conversation'] = $activity['context']; } if (!empty($item['conversation'])) { @@ -340,6 +336,7 @@ class Processor $conversation = []; } + Logger::debug('Create Item', ['id' => $activity['id'], 'conversation' => $item['conversation'] ?? '']); if (empty($activity['author']) && empty($activity['actor'])) { Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]); return []; @@ -490,11 +487,6 @@ class Processor */ private static function fetchParent(array $activity): string { - if (self::hasJustBeenFetched($activity['reply-to-id'])) { - Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]); - return ''; - } - $recursion_depth = $activity['recursion-depth'] ?? 0; if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) { @@ -552,23 +544,6 @@ class Processor return ''; } - /** - * Check if a given activity has recently been fetched - * - * @param string $url - * @return boolean - */ - private static function hasJustBeenFetched(string $url): bool - { - $cachekey = self::CACHEKEY_JUST_FETCHED . $url; - $time = DI::cache()->get($cachekey); - if (is_null($time)) { - DI::cache()->set($cachekey, time(), Duration::FIVE_MINUTES); - return false; - } - return ($time + 300) > time(); - } - /** * Check if a given activity is no longer available * @@ -656,7 +631,7 @@ class Processor public static function createActivity(array $activity, string $verb) { $activity['reply-to-id'] = $activity['object_id']; - $item = self::createItem($activity); + $item = self::createItem($activity, false); if (empty($item)) { return; } @@ -1419,7 +1394,7 @@ class Processor $ldactivity = JsonLD::compact($activity); - $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1; + $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 0; if (!empty($relay_actor)) { $ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor); diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 1f7946af22..e8267b0eaa 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -586,11 +586,19 @@ class Receiver $object_data['object_activity'] = $activity; } - if (($type == 'as:Create') && Queue::exists($object_data['object_id'], $type)) { - Logger::info('The activity is already added.', ['id' => $object_data['object_id']]); - return true; - } + if (($type == 'as:Create') && $trust_source) { + if (self::hasArrived($object_data['object_id'])) { + Logger::info('The activity already arrived.', ['id' => $object_data['object_id']]); + return true; + } + self::addArrivedId($object_data['object_id']); + if (Queue::exists($object_data['object_id'], $type)) { + Logger::info('The activity is already added.', ['id' => $object_data['object_id']]); + return true; + } + } + if (DI::config()->get('system', 'decoupled_receiver') && ($trust_source || DI::config()->get('debug', 'ap_inbox_store_untrusted'))) { $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push, $trust_source); } @@ -1883,4 +1891,29 @@ class Receiver return $object_data; } + + /** + * Add an object id to the list of arrived activities + * + * @param string $id + * + * @return void + */ + private static function addArrivedId(string $id) + { + DBA::delete('arrived-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]); + DBA::insert('arrived-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]); + } + + /** + * Checks if the given object already arrived before + * + * @param string $id + * + * @return boolean + */ + private static function hasArrived(string $id): bool + { + return DBA::exists('arrived-activity', ['object-id' => $id]); + } } diff --git a/static/dbstructure.config.php b/static/dbstructure.config.php index f72ec08020..df5115dcb4 100644 --- a/static/dbstructure.config.php +++ b/static/dbstructure.config.php @@ -55,7 +55,7 @@ use Friendica\Database\DBA; if (!defined('DB_UPDATE_VERSION')) { - define('DB_UPDATE_VERSION', 1477); + define('DB_UPDATE_VERSION', 1478); } return [ @@ -1723,6 +1723,28 @@ return [ "uri-id_uid" => ["UNIQUE", "uri-id", "uid"], ] ], + "arrived-activity" => [ + "comment" => "Id of arrived activities", + "fields" => [ + "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"], + "received" => ["type" => "datetime", "comment" => "Receiving date"], + ], + "indexes" => [ + "PRIMARY" => ["object-id"], + ], + "engine" => "MEMORY", + ], + "processed-activity" => [ + "comment" => "Id of processed activities", + "fields" => [ + "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"], + "received" => ["type" => "datetime", "comment" => "Receiving date"], + ], + "indexes" => [ + "PRIMARY" => ["object-id"], + ], + "engine" => "MEMORY", + ], "worker-ipc" => [ "comment" => "Inter process communication between the frontend and the worker", "fields" => [ -- 2.39.5