]> git.mxchange.org Git - friendica.git/commitdiff
Improved asynchronous message procession
authorMichael <heluecht@pirati.ca>
Wed, 25 Oct 2023 20:16:36 +0000 (20:16 +0000)
committerMichael <heluecht@pirati.ca>
Wed, 25 Oct 2023 20:16:36 +0000 (20:16 +0000)
src/Content/Item.php
src/Model/Item.php
src/Model/Post/Media.php
src/Protocol/ActivityPub/ClientToServer.php
src/Protocol/ActivityPub/Processor.php
src/Protocol/ActivityPub/Queue.php
src/Protocol/ActivityPub/Receiver.php
static/defaults.config.php

index c604f26215451afdca88027c104f7fcfa8d7909a..48dcb80d321a04579cbd9e1f7ff4257c5f11306b 100644 (file)
@@ -635,7 +635,7 @@ class Item
        public function addSharedPost(array $item, string $body = ''): string
        {
                if (empty($body)) {
-                       $body = $item['body'];
+                       $body = $item['body'] ?? '';
                }
 
                if (empty($item['quote-uri-id']) || ($item['quote-uri-id'] == $item['uri-id'])) {
index 04853314f5b90adf34659439982307c53ac6ef36..0e8a04b3efafe1e15577bce2af37804f13fe5eca 100644 (file)
@@ -3905,11 +3905,12 @@ class Item
         * Fetches item for given URI or plink
         *
         * @param string $uri
-        * @param integer $uid
+        * @param int    $uid
+        * @param int    $completion
         *
         * @return integer item id
         */
-       public static function fetchByLink(string $uri, int $uid = 0): int
+       public static function fetchByLink(string $uri, int $uid = 0, int $completion = ActivityPub\Receiver::COMPLETION_MANUAL): int
        {
                Logger::info('Trying to fetch link', ['uid' => $uid, 'uri' => $uri]);
                $item_id = self::searchByLink($uri, $uid);
@@ -3930,7 +3931,7 @@ class Item
                        return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0;
                }
 
-               $fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri, [], '', ActivityPub\Receiver::COMPLETION_MANUAL, $uid);
+               $fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri, [], '', $completion, $uid);
 
                if ($fetched_uri) {
                        $item_id = self::searchByLink($fetched_uri, $uid);
@@ -3990,7 +3991,7 @@ class Item
                }
 
                $url = $shared['message_id'] ?: $shared['link'];
-               $id = self::fetchByLink($url);
+               $id = self::fetchByLink($url, 0, ActivityPub\Receiver::COMPLETION_ASYNC);
                if (!$id) {
                        Logger::notice('Post could not be fetched.', ['url' => $url, 'uid' => $uid]);
                        return 0;
index 4ba67de9a82dadfebc92bc6459672c1fcc98d23e..671c19f454cf66b01ae9503cf25ccd396ed2ab07 100644 (file)
@@ -35,6 +35,7 @@ use Friendica\Model\Photo;
 use Friendica\Model\Post;
 use Friendica\Network\HTTPClient\Client\HttpClientAccept;
 use Friendica\Network\HTTPClient\Client\HttpClientOptions;
+use Friendica\Protocol\ActivityPub;
 use Friendica\Util\Images;
 use Friendica\Util\Network;
 use Friendica\Util\ParseUrl;
@@ -253,7 +254,7 @@ class Media
         */
        private static function addActivity(array $media): array
        {
-               $id = Item::fetchByLink($media['url']);
+               $id = Item::fetchByLink($media['url'], 0, ActivityPub\Receiver::COMPLETION_ASYNC);
                if (empty($id)) {
                        return $media;
                }
index 2b52d17b767dc22ef39bf7519182aabfd1ff9f32..8bc4dcc0597345ec2d098829eeb646d05d803a06 100644 (file)
@@ -142,7 +142,7 @@ class ClientToServer
         */
        private static function updateContent(int $uid, string $object_id, array $application, array $ldactivity): array
        {
-               $id            = Item::fetchByLink($object_id, $uid);
+               $id            = Item::fetchByLink($object_id, $uid, ActivityPub\Receiver::COMPLETION_ASYNC);
                $original_post = Post::selectFirst(['uri-id'], ['uid' => $uid, 'origin' => true, 'id' => $id]);
                if (empty($original_post)) {
                        Logger::debug('Item not found or does not belong to the user', ['id' => $id, 'uid' => $uid, 'object_id' => $object_id, 'activity' => $ldactivity]);
index df19ab8b8ffd152fea35aa1df61d1bd31e0cc028..2e204717b8da99e6e8be44bff4a2c80915a5f296 100644 (file)
@@ -348,12 +348,8 @@ class Processor
 
                if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
                        $result = self::fetchParent($activity, !empty($conversation));
-                       if (!empty($result)) {
-                               if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) {
-                                       $item['thr-parent'] = $result;
-                               }
-                       } elseif (empty($conversation)) {
-                               return [];
+                       if (!empty($result) && ($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) {
+                               $item['thr-parent'] = $result;
                        }
                }
 
@@ -532,39 +528,35 @@ class Processor
 
                self::addActivityId($activity['reply-to-id']);
 
-               if (!DI::config()->get('system', 'fetch_by_worker')) {
-                       $in_background = false;
+               $completion = $activity['completion-mode'] ?? Receiver::COMPLETION_NONE;
+
+               if (DI::config()->get('system', 'decoupled_receiver') && ($completion != Receiver::COMPLETION_MANUAL)) {
+                       $in_background = true;
                }
 
                $recursion_depth = $activity['recursion-depth'] ?? 0;
 
                if (!$in_background && ($recursion_depth < DI::config()->get('system', 'max_recursion_depth'))) {
-                       Logger::info('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
+                       Logger::info('Parent not found. Try to refetch it.', ['completion' => $completion, 'recursion-depth' => $recursion_depth, 'parent' => $activity['reply-to-id']]);
                        $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
                        if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
                                Logger::notice('The activity is gone, the queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
                                if (!empty($activity['entry-id'])) {
                                        Queue::deleteById($activity['entry-id']);
                                }
-                               return '';
                        } elseif (!empty($result)) {
-                               $exists = Post::exists(['uri' => [$result, $activity['reply-to-id']]]);
-                               if ($exists) {
-                                       Logger::info('The activity has been fetched and created.', ['parent' => $result]);
-                                       return $result;
-                               } elseif (DI::config()->get('system', 'fetch_by_worker') || DI::config()->get('system', 'decoupled_receiver')) {
-                                       Logger::info('The activity has been fetched and will hopefully be created later.', ['parent' => $result]);
+                               $post = Post::selectFirstPost(['uri'], ['uri' => [$result, $activity['reply-to-id']]]);
+                               if (!empty($post['uri'])) {
+                                       Logger::info('The activity has been fetched and created.', ['result' => $result, 'uri' => $post['uri']]);
+                                       return $post['uri'];
                                } else {
                                        Logger::notice('The activity exists but has not been created, the queue entry will be deleted.', ['parent' => $result]);
                                        if (!empty($activity['entry-id'])) {
                                                Queue::deleteById($activity['entry-id']);
                                        }
                                }
-                               return '';
-                       }
-                       if (empty($result) && !DI::config()->get('system', 'fetch_by_worker')) {
-                               return '';
                        }
+                       return '';
                } elseif (self::isActivityGone($activity['reply-to-id'])) {
                        Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
                        if ($in_background) {
@@ -586,7 +578,7 @@ class Processor
                        Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
                        Fetch::add($activity['reply-to-id']);
                        $activity['recursion-depth'] = 0;
-                       $wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
+                       $wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_ASYNC);
                        Fetch::setWorkerId($activity['reply-to-id'], $wid);
                } else {
                        Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]);
@@ -867,7 +859,7 @@ class Processor
                $content = self::addMentionLinks($content, $activity['tags']);
 
                if (!empty($activity['quote-url'])) {
-                       $id = Item::fetchByLink($activity['quote-url']);
+                       $id = Item::fetchByLink($activity['quote-url'], 0, ActivityPub\Receiver::COMPLETION_ASYNC);
                        if ($id) {
                                $shared_item = Post::selectFirst(['uri-id'], ['id' => $id]);
                                $item['quote-uri-id'] = $shared_item['uri-id'];
@@ -1456,7 +1448,7 @@ class Processor
                        if (empty($post['id'])) {
                                continue;
                        }
-                       $id = Item::fetchByLink($post['id']);
+                       $id = Item::fetchByLink($post['id'], 0, ActivityPub\Receiver::COMPLETION_ASYNC);
                        if (!empty($id)) {
                                $item = Post::selectFirst(['uri-id', 'featured', 'author-id'], ['id' => $id]);
                                if (!empty($item['uri-id'])) {
index 47c8e632a75230437543698054b47e19ba0129cf..3a234e945ad753188067c1f363cc10e146cb9713 100644 (file)
@@ -22,6 +22,7 @@
 namespace Friendica\Protocol\ActivityPub;
 
 use Friendica\Core\Logger;
+use Friendica\Core\Worker;
 use Friendica\Database\Database;
 use Friendica\Database\DBA;
 use Friendica\DI;
@@ -285,9 +286,25 @@ class Queue
                        }
                }
 
-               if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
-                       // This entry belongs to some other entry that should be processed first
-                       return false;
+               if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id'])) {
+                       if (DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
+                               // This entry belongs to some other entry that should be processed first
+                               return false;
+                       }
+                       if (!Post::exists(['uri' => $entry['in-reply-to-id']])) {
+                               // This entry belongs to some other entry that need to be fetched first
+                               if (Fetch::hasWorker($entry['in-reply-to-id'])) {
+                                       Logger::debug('Fetching of the activity is already queued', ['id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]);
+                                       return false;
+                               }
+                               Fetch::add($entry['in-reply-to-id']);
+                               $activity = json_decode($entry['activity'], true);
+                               $activity['recursion-depth'] = 0;
+                               $wid = Worker::add(Worker::PRIORITY_HIGH, 'FetchMissingActivity', $entry['in-reply-to-id'], $activity, '', Receiver::COMPLETION_ASYNC);
+                               Fetch::setWorkerId($entry['in-reply-to-id'], $wid);
+                               Logger::debug('Fetch missing activity', ['wid' => $wid, 'id' => $entry['activity-id'], 'reply-to-id' => $entry['in-reply-to-id']]);
+                               return false;
+                       }
                }
 
                return true;
index 567fe3b86f916a33bc460b34a1e44f9a2dd25d02..992b428c35740c0ed22c0dc241d08a19a248a18e 100644 (file)
@@ -80,6 +80,7 @@ class Receiver
        const COMPLETION_RELAY    = 2;
        const COMPLETION_MANUAL   = 3;
        const COMPLETION_AUTO     = 4;
+       const COMPLETION_ASYNC    = 5;
 
        /**
         * Checks incoming message from the inbox
@@ -681,7 +682,7 @@ class Receiver
                        return true;
                }
 
-               if (!empty($object_data['entry-id']) && $decouple && ($push || ($completion == self::COMPLETION_RELAY))) {
+               if (!empty($object_data['entry-id']) && $decouple && ($push || in_array($completion, [self::COMPLETION_RELAY, self::COMPLETION_ASYNC]))) {
                        if (Queue::isProcessable($object_data['entry-id'])) {
                                // We delay by 5 seconds to allow to accumulate all receivers
                                $delayed = date(DateTimeFormat::MYSQL, time() + 5);
index b639e19166d43906f332bd63a642e5654d6cb4c8..eb39767b66a5d082a7aa5904306a3f0ee1343a8e 100644 (file)
@@ -298,10 +298,6 @@ return [
                // Priority for the expiry notification
                'expire-notify-priority' => Friendica\Core\Worker::PRIORITY_LOW,
 
-               // fetch_by_worker (Boolean)
-               // Fetch missing posts via a background process
-               'fetch_by_worker' => false,
-
                // fetch_featured_posts (Boolean)
                // Fetch featured posts from all contacts
                'fetch_featured_posts' => false,