]> git.mxchange.org Git - friendica.git/commitdiff
Conversation entries will now be stored asynchronous if possible
authorMichael <heluecht@pirati.ca>
Wed, 10 Aug 2022 09:28:18 +0000 (09:28 +0000)
committerMichael <heluecht@pirati.ca>
Wed, 10 Aug 2022 09:28:18 +0000 (09:28 +0000)
database.sql
doc/database.md
doc/database/db_fetched-activity.md [new file with mode: 0644]
doc/database/db_processed-activity.md [deleted file]
src/Content/Conversation.php
src/Model/Item.php
src/Protocol/ActivityPub/Processor.php
static/dbstructure.config.php

index 6126e58a87a96b0204d035d37ffa3eb18d84ec7c..6df09b14b962c44ac0976a8c590ab0ddc0e1a612 100644 (file)
@@ -1,6 +1,6 @@
 -- ------------------------------------------
 -- Friendica 2022.09-dev (Giant Rhubarb)
--- DB_UPDATE_VERSION 1478
+-- DB_UPDATE_VERSION 1479
 -- ------------------------------------------
 
 
@@ -1726,13 +1726,13 @@ CREATE TABLE IF NOT EXISTS `arrived-activity` (
 ) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of arrived activities';
 
 --
--- TABLE processed-activity
+-- TABLE fetched-activity
 --
-CREATE TABLE IF NOT EXISTS `processed-activity` (
-       `object-id` varbinary(255) NOT NULL COMMENT 'object id of the incoming activity',
+CREATE TABLE IF NOT EXISTS `fetched-activity` (
+       `object-id` varbinary(255) NOT NULL COMMENT 'object id of fetched activity',
        `received` datetime COMMENT 'Receiving date',
         PRIMARY KEY(`object-id`)
-) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of processed activities';
+) ENGINE=MEMORY DEFAULT COLLATE utf8mb4_general_ci COMMENT='Id of fetched activities';
 
 --
 -- TABLE worker-ipc
index fe505ce18b1e28fc9149e142c03496b9bc4e76b8..5aed24e91a4eca77f224fb741859141ee88a2b6d 100644 (file)
@@ -26,6 +26,7 @@ Database Tables
 | [event](help/database/db_event) | Events |
 | [fcontact](help/database/db_fcontact) | Diaspora compatible contacts - used in the Diaspora implementation |
 | [fetch-entry](help/database/db_fetch-entry) |  |
+| [fetched-activity](help/database/db_fetched-activity) | Id of fetched activities |
 | [fsuggest](help/database/db_fsuggest) | friend suggestion stuff |
 | [group](help/database/db_group) | privacy groups, group info |
 | [group_member](help/database/db_group_member) | privacy groups, member info |
@@ -68,7 +69,6 @@ Database Tables
 | [post-user](help/database/db_post-user) | User specific post data |
 | [post-user-notification](help/database/db_post-user-notification) | User post notifications |
 | [process](help/database/db_process) | Currently running system processes |
-| [processed-activity](help/database/db_processed-activity) | Id of processed activities |
 | [profile](help/database/db_profile) | user profiles data |
 | [profile_field](help/database/db_profile_field) | Custom profile fields |
 | [push_subscriber](help/database/db_push_subscriber) | Used for OStatus: Contains feed subscribers |
diff --git a/doc/database/db_fetched-activity.md b/doc/database/db_fetched-activity.md
new file mode 100644 (file)
index 0000000..0a3ae7f
--- /dev/null
@@ -0,0 +1,22 @@
+Table fetched-activity
+===========
+
+Id of fetched activities
+
+Fields
+------
+
+| Field     | Description                   | Type           | Null | Key | Default | Extra |
+| --------- | ----------------------------- | -------------- | ---- | --- | ------- | ----- |
+| object-id | object id of fetched activity | varbinary(255) | NO   | PRI | NULL    |       |
+| received  | Receiving date                | datetime       | YES  |     | NULL    |       |
+
+Indexes
+------------
+
+| Name    | Fields    |
+| ------- | --------- |
+| PRIMARY | object-id |
+
+
+Return to [database documentation](help/database)
diff --git a/doc/database/db_processed-activity.md b/doc/database/db_processed-activity.md
deleted file mode 100644 (file)
index 618b46f..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-Table processed-activity
-===========
-
-Id of processed activities
-
-Fields
-------
-
-| Field     | Description                        | Type           | Null | Key | Default | Extra |
-| --------- | ---------------------------------- | -------------- | ---- | --- | ------- | ----- |
-| object-id | object id of the incoming activity | varbinary(255) | NO   | PRI | NULL    |       |
-| received  | Receiving date                     | datetime       | YES  |     | NULL    |       |
-
-Indexes
-------------
-
-| Name    | Fields    |
-| ------- | --------- |
-| PRIMARY | object-id |
-
-
-Return to [database documentation](help/database)
index 3941c514a9285fca2042c30c1bb2f803a91e0761..744e9ed18ff2bd0988e641229c502985f8bcbf4f 100644 (file)
@@ -854,7 +854,7 @@ class Conversation
                                $row['direction'] = ['direction' => 7, 'title' => $this->l10n->t('You had been addressed (%s).', 'bcc')];
                                break;
                        case ItemModel::PR_FOLLOWER:
-                               $row['direction'] = ['direction' => 6, 'title' => $this->l10n->t('You are following %s.', $row['author-name'])];
+                               $row['direction'] = ['direction' => 6, 'title' => $this->l10n->t('You are following %s.', $row['causer-name'] ?: $row['author-name'])];
                                break;
                        case ItemModel::PR_TAG:
                                $row['direction'] = ['direction' => 4, 'title' => $this->l10n->t('You subscribed to one or more tags in this post.')];
index 8566643ed2eb8a4127a681b5428e51dd1fb07e6c..c5b8bcf3c21592d26e42f12618079a5313030d54 100644 (file)
@@ -1513,7 +1513,7 @@ class Item
 
                $is_reshare = ($item['gravity'] == GRAVITY_ACTIVITY) && ($item['verb'] == Activity::ANNOUNCE);
 
-               if ((($item['gravity'] == GRAVITY_PARENT) || $is_reshare) &&
+               if (($uid != 0) && (($item['gravity'] == GRAVITY_PARENT) || $is_reshare) &&
                        DI::pConfig()->get($uid, 'system', 'accept_only_sharer') == self::COMPLETION_NONE &&
                        !in_array($item['post-reason'], [self::PR_FOLLOWER, self::PR_TAG, self::PR_TO, self::PR_CC])) {
                        Logger::info('Contact is not a follower, thread will not be stored', ['author' => $item['author-link'], 'uid' => $uid]);
index f142b7e91d148070b0234ef5de97c2923900c028..c029d1bc6ed0a98c9aade775f46561b56d4a32f1 100644 (file)
@@ -69,20 +69,20 @@ class Processor
         */
        private static function addActivityId(string $id)
        {
-               DBA::delete('processed-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
-               DBA::insert('processed-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
+               DBA::delete('fetched-activity', ["`received` < ?", DateTimeFormat::utc('now - 5 minutes')]);
+               DBA::insert('fetched-activity', ['object-id' => $id, 'received' => DateTimeFormat::utcNow()]);
        }
 
        /**
-        * Checks if the given object id has just been processed
+        * Checks if the given object id has just been fetched
         *
         * @param string $id
         *
         * @return boolean
         */
-       private static function isProcessed(string $id): bool
+       private static function isFetched(string $id): bool
        {
-               return DBA::exists('processed-activity', ['object-id' => $id]);
+               return DBA::exists('fetched-activity', ['object-id' => $id]);
        }
 
        /**
@@ -303,13 +303,6 @@ class Processor
         */
        public static function createItem(array $activity, bool $fetch_parents): array
        {
-               if (self::isProcessed($activity['id']) && !Post::exists(['uri' => $activity['id']])) {
-                       Logger::info('Id is already processed', ['id' => $activity['id']]);
-                       return [];
-               }
-
-               self::addActivityId($activity['id']);
-
                $item = [];
                $item['verb'] = Activity::POST;
                $item['thr-parent'] = $activity['reply-to-id'];
@@ -333,6 +326,7 @@ class Processor
                        if (!empty($conversation)) {
                                Logger::debug('Got conversation', ['conversation' => $item['conversation'], 'parent' => $conversation]);
                                $item['parent-uri'] = $conversation['uri'];
+                               $item['parent-uri-id'] = ItemURI::getIdByURI($item['parent-uri']);
                        }
                } else {
                        $conversation = [];
@@ -350,7 +344,7 @@ class Processor
                }
 
                if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
-                       $result = self::fetchParent($activity);
+                       $result = self::fetchParent($activity, !empty($conversation));
                        if (!empty($result)) {
                                if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) {
                                        $item['thr-parent'] = $result;
@@ -457,6 +451,8 @@ class Processor
                        return [];
                }
 
+               $item['thr-parent-id'] = ItemURI::getIdByURI($item['thr-parent']);
+
                $item = self::processContent($activity, $item);
                if (empty($item)) {
                        Logger::info('Message was not processed');
@@ -489,14 +485,26 @@ class Processor
         * Fetch and process parent posts for the given activity
         *
         * @param array $activity
+        * @param bool  $in_background
         *
         * @return string
         */
-       private static function fetchParent(array $activity): string
+       private static function fetchParent(array $activity, bool $in_background = false): string
        {
+               if (self::isFetched($activity['reply-to-id'])) {
+                       Logger::info('Id is already fetched', ['id' => $activity['reply-to-id']]);
+                       return '';
+               }
+
+               self::addActivityId($activity['reply-to-id']);
+
+               if (!DI::config()->get('system', 'fetch_by_worker')) {
+                       $in_background = false;
+               }
+
                $recursion_depth = $activity['recursion-depth'] ?? 0;
 
-               if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) {
+               if (!$in_background && ($recursion_depth < DI::config()->get('system', 'max_recursion_depth'))) {
                        Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
                        $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
                        if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
@@ -525,17 +533,19 @@ class Processor
                        }
                } elseif (self::isActivityGone($activity['reply-to-id'])) {
                        Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
-                       if (!empty($activity['entry-id'])) {
+                       if ($in_background) {
+                               // fetching in background is done for all activities where we have got the conversation
+                               // There we only delete the single activity and not thr whole thread since we can store the
+                               // other posts in the thread even with missing posts.
+                               Queue::remove($activity);
+                       } elseif (!empty($activity['entry-id'])) {
                                Queue::deleteById($activity['entry-id']);
                        }
                        return '';
-               } else {
+               } elseif (!$in_background) {
                        Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
-               }
-
-               if (Queue::hasWorker($activity['worker-id'] ?? 0)) {
-                       Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]);
-                       return '';
+               } elseif ($in_background) {
+                       Logger::notice('Fetching is done in the background.', ['parent' => $activity['reply-to-id']]);
                }
 
                if (!Fetch::hasWorker($activity['reply-to-id'])) {
@@ -1056,6 +1066,10 @@ class Processor
                                Logger::info('Accepting post', ['uid' => $receiver, 'url' => $item['uri']]);
                        }
 
+                       if (!self::hasParents($item, $receiver)) {
+                               continue;
+                       }
+
                        if (($item['gravity'] != GRAVITY_ACTIVITY) && ($activity['object_type'] == 'as:Event')) {
                                $event_id = self::createEvent($activity, $item);
 
@@ -1096,6 +1110,56 @@ class Processor
                }
        }
 
+       /**
+        * Checks if there are parent posts for the given receiver.
+        * If not, then the system will try to add them.
+        *
+        * @param array $item
+        * @param integer $receiver
+        * @return boolean
+        */
+       private static function hasParents(array $item, int $receiver)
+       {
+               if (($receiver == 0) || ($item['gravity'] == GRAVITY_PARENT)) {
+                       return true;
+               }
+
+               $fields = ['causer-id' => $item['causer-id'] ?? $item['author-id'], 'post-reason' => Item::PR_FETCHED];
+
+               $has_parents = false;
+
+               if (!empty($item['parent-uri-id'])) {
+                       if (Post::exists(['uri-id' => $item['parent-uri-id'], 'uid' => $receiver])) {
+                               $has_parents = true;
+                       } elseif (Post::exists(['uri-id' => $item['parent-uri'], 'uid' => 0])) {
+                               $stored = Item::storeForUserByUriId($item['parent-uri-id'], $receiver, $fields);
+                               $has_parents = (bool)$stored;
+                               if ($stored) {
+                                       Logger::notice('Inserted missing parent post', ['stored' => $stored, 'uid' => $receiver, 'parent' => $item['parent-uri']]);
+                               } else {
+                                       Logger::notice('Parent could not be added.', ['uid' => $receiver, 'uri' => $item['uri'], 'parent' => $item['parent-uri']]);
+                                       return false;
+                               }
+                       }
+               }
+
+               if (empty($item['parent-uri-id']) || ($item['thr-parent-id'] != $item['parent-uri-id'])) {
+                       if (Post::exists(['uri-id' => $item['thr-parent-id'], 'uid' => $receiver])) {
+                               $has_parents = true;
+                       } elseif (Post::exists(['uri-id' => $item['thr-parent-id'], 'uid' => 0])) {
+                               $stored = Item::storeForUserByUriId($item['thr-parent-id'], $receiver, $fields);
+                               $has_parents = $has_parents || (bool)$stored;
+                               if ($stored) {
+                                       Logger::notice('Inserted missing thread parent post', ['stored' => $stored, 'uid' => $receiver, 'thread-parent' => $item['thr-parent']]);
+                               } else {
+                                       Logger::notice('Thread parent could not be added.', ['uid' => $receiver, 'uri' => $item['uri'], 'thread-parent' => $item['thr-parent']]);
+                               }
+                       }
+               }
+
+               return $has_parents;
+       }
+
        /**
         * Store tags and mentions into the tag table
         *
index df5115dcb41c09d114e7f3775b921ede1bacd3a7..eb751d5b31ef9780f5c89982b844302cc1baaf66 100644 (file)
@@ -55,7 +55,7 @@
 use Friendica\Database\DBA;
 
 if (!defined('DB_UPDATE_VERSION')) {
-       define('DB_UPDATE_VERSION', 1478);
+       define('DB_UPDATE_VERSION', 1479);
 }
 
 return [
@@ -1734,10 +1734,10 @@ return [
                ],
                "engine" => "MEMORY",
        ],
-       "processed-activity" => [
-               "comment" => "Id of processed activities",
+       "fetched-activity" => [
+               "comment" => "Id of fetched activities",
                "fields" => [
-                       "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of the incoming activity"],
+                       "object-id" => ["type" => "varbinary(255)", "not null" => "1", "primary" => "1", "comment" => "object id of fetched activity"],
                        "received" => ["type" => "datetime", "comment" => "Receiving date"],
                ],
                "indexes" => [