]> git.mxchange.org Git - friendica.git/commitdiff
Fetching of missing posts is reworked
authorMichael <heluecht@pirati.ca>
Thu, 21 Jul 2022 05:16:14 +0000 (05:16 +0000)
committerMichael <heluecht@pirati.ca>
Thu, 21 Jul 2022 05:16:14 +0000 (05:16 +0000)
src/Model/Item.php
src/Module/Debug/ActivityPubConversion.php
src/Protocol/ActivityPub.php
src/Protocol/ActivityPub/FetchQueue.php [deleted file]
src/Protocol/ActivityPub/FetchQueueItem.php [deleted file]
src/Protocol/ActivityPub/Processor.php
src/Protocol/ActivityPub/Queue.php [new file with mode: 0644]
src/Protocol/ActivityPub/Receiver.php
src/Worker/FetchMissingActivity.php [new file with mode: 0644]
static/dbstructure.config.php

index 01ea942c869fec0c5e01ca45d7f17deb76d9d1af..7c3eea9eb12c662c0e104cca2b5a4bbcabde77f6 100644 (file)
@@ -3410,9 +3410,7 @@ class Item
                        return is_numeric($hookData['item_id']) ? $hookData['item_id'] : 0;
                }
 
-               $fetchQueue = new ActivityPub\FetchQueue();
-               $fetched_uri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $uri);
-               $fetchQueue->process();
+               $fetched_uri = ActivityPub\Processor::fetchMissingActivity($uri);
 
                if ($fetched_uri) {
                        $item_id = self::searchByLink($fetched_uri, $uid);
index 5fa9a8b409dc58b49c698d2fd3976e1d3f4dcdbc..ec7fee3f462ffb2e8a9a9080075efff8b291b889 100644 (file)
@@ -123,7 +123,7 @@ class ActivityPubConversion extends BaseModule
                                        'content' => visible_whitespace(var_export($object_data, true))
                                ];
 
-                               $item = ActivityPub\Processor::createItem(new ActivityPub\FetchQueue(), $object_data);
+                               $item = ActivityPub\Processor::createItem($object_data);
 
                                $results[] = [
                                        'title'   => DI::l10n()->t('Result Item'),
index 858f837e8f66db36ef266b1003d2ff830be8988c..93204e81d3722da96c65810566a4e27e87cf8181 100644 (file)
@@ -25,7 +25,6 @@ use Friendica\Core\Logger;
 use Friendica\Core\Protocol;
 use Friendica\Model\APContact;
 use Friendica\Model\User;
-use Friendica\Protocol\ActivityPub\FetchQueue;
 use Friendica\Util\HTTPSignature;
 use Friendica\Util\JsonLD;
 
@@ -224,14 +223,10 @@ class ActivityPub
                        $items = [];
                }
 
-               $fetchQueue = new FetchQueue();
-
                foreach ($items as $activity) {
                        $ldactivity = JsonLD::compact($activity);
-                       ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, '', $uid, true);
+                       ActivityPub\Receiver::processActivity($ldactivity, '', $uid, true);
                }
-
-               $fetchQueue->process();
        }
 
        /**
diff --git a/src/Protocol/ActivityPub/FetchQueue.php b/src/Protocol/ActivityPub/FetchQueue.php
deleted file mode 100644 (file)
index dfaa338..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-<?php
-/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
- *
- * @license GNU AGPL version 3 or any later version
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.  If not, see <https://www.gnu.org/licenses/>.
- *
- */
-
-namespace Friendica\Protocol\ActivityPub;
-
-/**
- * This class prevents maximum function nesting errors by flattening recursive calls to Processor::fetchMissingActivity
- */
-class FetchQueue
-{
-       /** @var FetchQueueItem[] */
-       protected $queue = [];
-
-       public function push(FetchQueueItem $item)
-       {
-               array_push($this->queue, $item);
-       }
-
-       /**
-        * Processes missing activities one by one. It is possible that a processing call will add additional missing
-        * activities, they will be processed in subsequent iterations of the loop.
-        *
-        * Since this process is self-contained, it isn't suitable to retrieve the URI of a single activity.
-        *
-        * The simplest way to get the URI of the first activity and ensures all the parents are fetched is this way:
-        *
-        * $fetchQueue = new ActivityPub\FetchQueue();
-        * $fetchedUri = ActivityPub\Processor::fetchMissingActivity($fetchQueue, $activityUri);
-        * $fetchQueue->process();
-        */
-       public function process()
-       {
-               while (count($this->queue)) {
-                       $fetchQueueItem = array_pop($this->queue);
-
-                       call_user_func_array([Processor::class, 'fetchMissingActivity'], array_merge([$this], $fetchQueueItem->toParameters()));
-               }
-       }
-}
diff --git a/src/Protocol/ActivityPub/FetchQueueItem.php b/src/Protocol/ActivityPub/FetchQueueItem.php
deleted file mode 100644 (file)
index 716c231..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-<?php
-/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
- *
- * @license GNU AGPL version 3 or any later version
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.  If not, see <https://www.gnu.org/licenses/>.
- *
- */
-
-namespace Friendica\Protocol\ActivityPub;
-
-class FetchQueueItem
-{
-       /** @var string */
-       private $url;
-       /** @var array */
-       private $child;
-       /** @var string */
-       private $relay_actor;
-       /** @var int */
-       private $completion;
-
-       /**
-        * This constructor matches the signature of Processor::fetchMissingActivity except for the default $completion value
-        *
-        * @param string $url
-        * @param array  $child
-        * @param string $relay_actor
-        * @param int    $completion
-        */
-       public function __construct(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_AUTO)
-       {
-               $this->url         = $url;
-               $this->child       = $child;
-               $this->relay_actor = $relay_actor;
-               $this->completion  = $completion;
-       }
-
-       /**
-        * Array meant to be used in call_user_function_array([Processor::class, 'fetchMissingActivity']). Caller needs to
-        * provide an instance of a FetchQueue that isn't included in these parameters.
-        *
-        * @see FetchQueue::process()
-        * @return array
-        */
-       public function toParameters(): array
-       {
-               return [$this->url, $this->child, $this->relay_actor, $this->completion];
-       }
-}
index e6ff43f06acd807e53c1a260e21a5c26931d0a45..60793f80262db9db170817ceea1f94edebc8c881 100644 (file)
@@ -189,17 +189,16 @@ class Processor
        /**
         * Updates a message
         *
-        * @param FetchQueue $fetchQueue
         * @param array      $activity   Activity array
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
         * @throws \ImagickException
         */
-       public static function updateItem(FetchQueue $fetchQueue, array $activity)
+       public static function updateItem(array $activity)
        {
                $item = Post::selectFirst(['uri', 'uri-id', 'thr-parent', 'gravity', 'post-type'], ['uri' => $activity['id']]);
                if (!DBA::isResult($item)) {
                        Logger::warning('No existing item, item will be created', ['uri' => $activity['id']]);
-                       $item = self::createItem($fetchQueue, $activity);
+                       $item = self::createItem($activity);
                        if (empty($item)) {
                                return;
                        }
@@ -223,7 +222,7 @@ class Processor
                Post\History::add($item['uri-id'], $item);
                Item::update($item, ['uri' => $activity['id']]);
 
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
 
                if ($activity['object_type'] == 'as:Event') {
                        $posts = Post::select(['event-id', 'uid'], ["`uri` = ? AND `event-id` > ?", $activity['id'], 0]);
@@ -262,13 +261,12 @@ class Processor
        /**
         * Prepares data for a message
         *
-        * @param FetchQueue $fetchQueue
         * @param array      $activity   Activity array
         * @return array Internal item
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
         * @throws \ImagickException
         */
-       public static function createItem(FetchQueue $fetchQueue, array $activity): array
+       public static function createItem(array $activity): array
        {
                $item = [];
                $item['verb'] = Activity::POST;
@@ -283,13 +281,15 @@ class Processor
                }
 
                if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
-                       Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id']]);
-                       /**
-                        * Instead of calling recursively self::fetchMissingActivity which can hit PHP's default function nesting
-                        * limit of 256 recursive calls, we push the parent activity fetch parameters in this queue. The initial
-                        * caller is responsible for processing the remaining queue once the original activity has been processed.
-                        */
-                       $fetchQueue->push(new FetchQueueItem($activity['reply-to-id'], $activity));
+                       $recursion_depth = $activity['recursion-depth'] ?? 0;
+                       Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
+                       if ($recursion_depth < 10) {
+                               self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
+                       } else {
+                               Logger::notice('Recursion level is too high, fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
+                               Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
+                               return [];
+                       }
                }
 
                $item['diaspora_signed_text'] = $activity['diaspora:comment'] ?? '';
@@ -428,7 +428,7 @@ class Processor
 
                Logger::info('Deleting item', ['object' => $activity['object_id'], 'owner'  => $owner]);
                Item::markForDeletion(['uri' => $activity['object_id'], 'owner-id' => $owner]);
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
        }
 
        /**
@@ -464,15 +464,14 @@ class Processor
        /**
         * Prepare the item array for an activity
         *
-        * @param FetchQueue $fetchQueue
         * @param array      $activity   Activity array
         * @param string     $verb       Activity verb
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
         * @throws \ImagickException
         */
-       public static function createActivity(FetchQueue $fetchQueue, array $activity, string $verb)
+       public static function createActivity(array $activity, string $verb)
        {
-               $item = self::createItem($fetchQueue, $activity);
+               $item = self::createItem($activity);
                if (empty($item)) {
                        return;
                }
@@ -546,7 +545,7 @@ class Processor
                Logger::debug('Add post to featured collection', ['uri-id' => $uriid]);
 
                Post\Collection::add($uriid, Post\Collection::FEATURED);
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
        }
 
        /**
@@ -564,7 +563,7 @@ class Processor
                Logger::debug('Remove post from featured collection', ['uri-id' => $uriid]);
 
                Post\Collection::remove($uriid, Post\Collection::FEATURED);
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
        }
 
        /**
@@ -652,13 +651,12 @@ class Processor
                        $item['body'] = Item::improveSharedDataInBody($item);
                } else {
                        if (empty($activity['directmessage']) && ($item['thr-parent'] != $item['uri']) && ($item['gravity'] == GRAVITY_COMMENT)) {
-                               $item_private = !in_array(0, $activity['item_receiver']);
                                $parent = Post::selectFirst(['id', 'uri-id', 'private', 'author-link', 'alias'], ['uri' => $item['thr-parent']]);
                                if (!DBA::isResult($parent)) {
                                        Logger::warning('Unknown parent item.', ['uri' => $item['thr-parent']]);
                                        return false;
                                }
-                               if ($item_private && ($parent['private'] != Item::PRIVATE)) {
+                               if (($parent['private'] == Item::PRIVATE) && ($parent['private'] != Item::PRIVATE)) {
                                        Logger::warning('Item is private but the parent is not. Dropping.', ['item-uri' => $item['uri'], 'thr-parent' => $item['thr-parent']]);
                                        return false;
                                }
@@ -783,6 +781,7 @@ class Processor
                }
 
                $stored = false;
+               $success = false;
                ksort($activity['receiver']);
 
                if (!self::isSolicitedMessage($activity, $item)) {
@@ -895,7 +894,7 @@ class Processor
                        $item_id = Item::insert($item);
                        if ($item_id) {
                                Logger::info('Item insertion successful', ['user' => $item['uid'], 'item_id' => $item_id]);
-                               Receiver::removeFromQueue($activity);
+                               $success = true;
                        } else {
                                Logger::notice('Item insertion aborted', ['user' => $item['uid']]);
                        }
@@ -905,6 +904,11 @@ class Processor
                        }
                }
 
+               if ($success) {
+                       Queue::remove($activity);
+                       Queue::processReplyByUri($item['uri']);
+               }
+
                // Store send a follow request for every reshare - but only when the item had been stored
                if ($stored && ($item['private'] != Item::PRIVATE) && ($item['gravity'] == GRAVITY_PARENT) && !empty($item['author-link']) && ($item['author-link'] != $item['owner-link'])) {
                        $author = APContact::getByURL($item['owner-link'], false);
@@ -1121,7 +1125,6 @@ class Processor
        /**
         * Fetches missing posts
         *
-        * @param FetchQueue $fetchQueue
         * @param string     $url         message URL
         * @param array      $child       activity array with the child of this message
         * @param string     $relay_actor Relay actor
@@ -1130,7 +1133,7 @@ class Processor
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
         * @throws \ImagickException
         */
-       public static function fetchMissingActivity(FetchQueue $fetchQueue, string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
+       public static function fetchMissingActivity(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL): string
        {
                if (!empty($child['receiver'])) {
                        $uid = ActivityPub\Receiver::getFirstUserFromReceivers($child['receiver']);
@@ -1140,7 +1143,7 @@ class Processor
 
                $object = ActivityPub::fetchContent($url, $uid);
                if (empty($object)) {
-                       Logger::notice('Activity was not fetchable, aborting.', ['url' => $url]);
+                       Logger::notice('Activity was not fetchable, aborting.', ['url' => $url, 'uid' => $uid]);
                        return '';
                }
 
@@ -1192,6 +1195,8 @@ class Processor
 
                $ldactivity = JsonLD::compact($activity);
 
+               $ldactivity['recursion-depth'] = !empty($child['recursion-depth']) ? $child['recursion-depth'] + 1 : 1;
+
                if (!empty($relay_actor)) {
                        $ldactivity['thread-completion'] = $ldactivity['from-relay'] = Contact::getIdForURL($relay_actor);
                        $ldactivity['completion-mode']   = Receiver::COMPLETION_RELAY;
@@ -1211,7 +1216,7 @@ class Processor
                        return '';
                }
 
-               ActivityPub\Receiver::processActivity($fetchQueue, $ldactivity, json_encode($activity), $uid, true, false, $signer);
+               ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer);
 
                Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]);
 
@@ -1354,7 +1359,7 @@ class Processor
 
                Logger::info('Updating profile', ['object' => $activity['object_id']]);
                Contact::updateFromProbeByURL($activity['object_id']);
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
        }
 
        /**
@@ -1383,7 +1388,7 @@ class Processor
                DBA::close($contacts);
 
                Logger::info('Deleted contact', ['object' => $activity['object_id']]);
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
        }
 
        /**
@@ -1466,7 +1471,7 @@ class Processor
                $condition = ['id' => $cid];
                Contact::update($fields, $condition);
                Logger::info('Accept contact request', ['contact' => $cid, 'user' => $uid]);
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
        }
 
        /**
@@ -1500,7 +1505,7 @@ class Processor
                } else {
                        Logger::info('Rejected contact request', ['contact' => $cid, 'user' => $uid]);
                }
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
        }
 
        /**
@@ -1526,7 +1531,7 @@ class Processor
                }
 
                Item::markForDeletion(['uri' => $activity['object_id'], 'author-id' => $author_id, 'gravity' => GRAVITY_ACTIVITY]);
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
        }
 
        /**
@@ -1563,7 +1568,7 @@ class Processor
 
                Contact::removeFollower($contact);
                Logger::info('Undo following request', ['contact' => $cid, 'user' => $uid]);
-               Receiver::removeFromQueue($activity);
+               Queue::remove($activity);
        }
 
        /**
diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php
new file mode 100644 (file)
index 0000000..0cb8472
--- /dev/null
@@ -0,0 +1,114 @@
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Protocol\ActivityPub;
+
+use Friendica\Core\Logger;
+use Friendica\Database\Database;
+use Friendica\Database\DBA;
+use Friendica\Util\DateTimeFormat;
+
+/**
+ * This class handles the processing of incoming posts
+ */
+class Queue
+{
+       public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array
+       {
+               $fields = [
+                       'activity-id' => $activity['id'],
+                       'object-id'   => $activity['object_id'],
+                       'type'        => $type,
+                       'object-type' => $activity['object_type'],
+                       'activity'    => json_encode($activity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
+                       'received'    => DateTimeFormat::utcNow(),
+                       'push'        => $push,
+               ];
+
+               if (!empty($activity['reply-to-id'])) {
+                       $fields['in-reply-to-id'] = $activity['reply-to-id'];
+               }
+
+               if (!empty($activity['object_object_type'])) {
+                       $fields['object-object-type'] = $activity['object_object_type'];
+               }
+
+               if (!empty($http_signer)) {
+                       $fields['signer'] = $http_signer;
+               }
+
+               DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE);
+
+               $queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $activity['id']]);
+               if (!empty($queue['id'])) {
+                       $activity['entry-id'] = $queue['id'];
+                       DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE);
+               }
+               return $activity;
+       }
+
+       public static function remove(array $activity = [])
+       {
+               if (empty($activity['entry-id'])) {
+                       return;
+               }
+               DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
+               //echo "Delete ".$activity['entry-id']."\n";
+
+       }
+
+       public static function process(int $id)
+       {
+               $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
+               if (empty($entry)) {
+                       return;
+               }
+       
+               Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
+
+               $activity   = json_decode($entry['activity'], true);
+               $type       = $entry['type'];
+               $push       = $entry['push'];
+
+               $activity['entry-id'] = $entry['id'];
+
+               if (!Receiver::routeActivities($activity, $type, $push)) {
+                       self::remove($activity);
+               }
+       }
+
+       public static function processAll()
+       {
+               $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]);
+               while ($entry = DBA::fetch($entries)) {
+                       echo $entry['id'] . "\t" . $entry['type'] . "\t" . $entry['object-type'] . "\n";
+                       self::process($entry['id']);
+               }
+       }
+
+       public static function processReplyByUri(string $uri)
+       {
+               $entries = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $uri], ['order' => ['id' => true]]);
+               while ($entry = DBA::fetch($entries)) {
+                       self::process($entry['id']);
+               }
+       }
+}
index 4d7309001604e36e72378960b30c267661c18eb8..cd583132e6b2a2170a65741b3cfd3dfd12ac64db 100644 (file)
@@ -28,7 +28,6 @@ use Friendica\Content\Text\Markdown;
 use Friendica\Core\Logger;
 use Friendica\Core\Protocol;
 use Friendica\Core\System;
-use Friendica\Database\Database;
 use Friendica\DI;
 use Friendica\Model\Contact;
 use Friendica\Model\APContact;
@@ -37,7 +36,6 @@ use Friendica\Model\Post;
 use Friendica\Model\User;
 use Friendica\Protocol\Activity;
 use Friendica\Protocol\ActivityPub;
-use Friendica\Util\DateTimeFormat;
 use Friendica\Util\HTTPSignature;
 use Friendica\Util\JsonLD;
 use Friendica\Util\LDSignature;
@@ -155,46 +153,7 @@ class Receiver
                        $trust_source = false;
                }
 
-               $fetchQueue = new FetchQueue();
-               self::processActivity($fetchQueue, $ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer);
-               $fetchQueue->process();
-       }
-
-       private static function enqueuePost(array $ldactivity = [], string $type, int $uid, string $http_signer): array
-       {
-               $fields = [
-                       'activity-id' => $ldactivity['id'],
-                       'object-id' => $ldactivity['object_id'],
-                       'type' => $type,
-                       'object-type' => $ldactivity['object_type'],
-                       'activity' => json_encode($ldactivity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
-                       'received' => DateTimeFormat::utcNow(),
-               ];
-
-               if (!empty($ldactivity['object_object_type'])) {
-                       $fields['object-object-type'] = $ldactivity['object_object_type'];
-               }
-
-               if (!empty($http_signer)) {
-                       $fields['signer'] = $http_signer;
-               }
-
-               DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE);
-
-               $queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $ldactivity['id']]);
-               if (!empty($queue['id'])) {
-                       $ldactivity['entry-id'] = $queue['id'];
-                       DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE);
-               }
-               return $ldactivity;
-       }
-
-       public static function removeFromQueue(array $activity = [])
-       {
-               if (empty($activity['entry-id'])) {
-                       return;
-               }
-               DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
+               self::processActivity($ldactivity, $body, $uid, $trust_source, true, $signer, $http_signer);
        }
 
        /**
@@ -242,16 +201,12 @@ class Receiver
                        return;
                }
 
-               $fetchQueue = new FetchQueue();
-
-               $id = Processor::fetchMissingActivity($fetchQueue, $object_id, [], $actor, self::COMPLETION_RELAY);
+               $id = Processor::fetchMissingActivity($object_id, [], $actor, self::COMPLETION_RELAY);
                if (empty($id)) {
                        Logger::notice('Relayed message had not been fetched', ['id' => $object_id, 'actor' => $actor]);
                        return;
                }
 
-               $fetchQueue->process();
-
                $item_id = Item::searchByLink($object_id);
                if ($item_id) {
                        Logger::info('Relayed message had been fetched and stored', ['id' => $object_id, 'item' => $item_id, 'actor' => $actor]);
@@ -518,7 +473,6 @@ class Receiver
        /**
         * Processes the activity object
         *
-        * @param FetchQueue $fetchQueue
         * @param array      $activity     Array with activity data
         * @param string     $body         The unprocessed body
         * @param int|null   $uid          User ID
@@ -528,7 +482,7 @@ class Receiver
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
         * @throws \ImagickException
         */
-       public static function processActivity(FetchQueue $fetchQueue, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '')
+       public static function processActivity(array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [], string $http_signer = '')
        {
                $type = JsonLD::fetchElement($activity, '@type');
                if (!$type) {
@@ -588,7 +542,7 @@ class Receiver
                if (!empty($activity['thread-completion'])) {
                        $object_data['thread-completion'] = $activity['thread-completion'];
                }
-
+               
                if (!empty($activity['completion-mode'])) {
                        $object_data['completion-mode'] = $activity['completion-mode'];
                }
@@ -597,36 +551,56 @@ class Receiver
                        $object_data['thread-children-type'] = $activity['thread-children-type'];
                }
 
+               if (!empty($activity['recursion-depth'])) {
+                       $object_data['recursion-depth'] = $activity['recursion-depth'];
+               }
+
                // Internal flag for posts that arrived via relay
                if (!empty($activity['from-relay'])) {
                        $object_data['from-relay'] = $activity['from-relay'];
                }
 
-               $object_data = self::enqueuePost($object_data, $type, $uid, $http_signer);
+               if ($type == 'as:Announce') {
+                       $object_data['object_activity'] = $activity;
+               }
+
+               $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push);
 
                if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) {
                        self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
                }
 
+               if (!self::routeActivities($object_data, $type, $push)) {
+                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                       //if (!DI::config()->get('debug', 'ap_log_unknown')) {
+                       //      Queue::remove($object_data);
+                       //}
+               }
+       }
+
+       public static function routeActivities($object_data, $type, $push)
+       {
+               $activity = $object_data['object_activity']     ?? [];
+
                switch ($type) {
                        case 'as:Create':
                                if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
-                                       $item = ActivityPub\Processor::createItem($fetchQueue, $object_data);
+                                       $item = ActivityPub\Processor::createItem($object_data);
                                        ActivityPub\Processor::postItem($object_data, $item);
                                } elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) {
                                        // Unhandled Peertube activity
-                                       self::removeFromQueue($object_data);
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
                        case 'as:Invite':
                                if (in_array($object_data['object_type'], ['as:Event'])) {
-                                       $item = ActivityPub\Processor::createItem($fetchQueue, $object_data);
+                                       $item = ActivityPub\Processor::createItem($object_data);
                                        ActivityPub\Processor::postItem($object_data, $item);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
@@ -637,17 +611,19 @@ class Receiver
                                        ActivityPub\Processor::addToFeaturedCollection($object_data);
                                } elseif ($object_data['object_type'] == '') {
                                        // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
                        case 'as:Announce':
                                if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
+                                       $actor = JsonLD::fetchElement($activity, 'as:actor', '@id');
                                        $object_data['thread-completion'] = Contact::getIdForURL($actor);
                                        $object_data['completion-mode']   = self::COMPLETION_ANNOUCE;
 
-                                       $item = ActivityPub\Processor::createItem($fetchQueue, $object_data);
+                                       $item = ActivityPub\Processor::createItem($object_data);
                                        if (empty($item)) {
                                                return;
                                        }
@@ -655,61 +631,64 @@ class Receiver
                                        $item['post-reason'] = Item::PR_ANNOUNCEMENT;
                                        ActivityPub\Processor::postItem($object_data, $item);
 
-                                       $announce_object_data = self::processObject($activity);
-                                       $announce_object_data['name'] = $type;
-                                       $announce_object_data['author'] = JsonLD::fetchElement($activity, 'as:actor', '@id');
-                                       $announce_object_data['object_id'] = $object_data['object_id'];
-                                       $announce_object_data['object_type'] = $object_data['object_type'];
-                                       $announce_object_data['push'] = $push;
+                                       if (!empty($activity)) {
+                                               $announce_object_data = self::processObject($activity);
+                                               $announce_object_data['name'] = $type;
+                                               $announce_object_data['author'] = $actor;
+                                               $announce_object_data['object_id'] = $object_data['object_id'];
+                                               $announce_object_data['object_type'] = $object_data['object_type'];
+                                               $announce_object_data['push'] = $push;
 
-                                       if (!empty($body)) {
-                                               $announce_object_data['raw'] = $body;
-                                       }
-
-                                       ActivityPub\Processor::createActivity($fetchQueue, $announce_object_data, Activity::ANNOUNCE);
+                                               if (!empty($object_data['raw'])) {
+                                                       $announce_object_data['raw'] = $object_data['raw'];
+                                               }
+                                               ActivityPub\Processor::createActivity($announce_object_data, Activity::ANNOUNCE);
+                                       } else echo "\n***************************\n";
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
                        case 'as:Like':
                                if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
-                                       ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::LIKE);
+                                       ActivityPub\Processor::createActivity($object_data, Activity::LIKE);
                                } elseif ($object_data['object_type'] == '') {
                                        // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
                        case 'as:Dislike':
                                if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
-                                       ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::DISLIKE);
+                                       ActivityPub\Processor::createActivity($object_data, Activity::DISLIKE);
                                } elseif ($object_data['object_type'] == '') {
                                        // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
                        case 'as:TentativeAccept':
                                if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
-                                       ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDMAYBE);
+                                       ActivityPub\Processor::createActivity($object_data, Activity::ATTENDMAYBE);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
                        case 'as:Update':
                                if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
-                                       ActivityPub\Processor::updateItem($fetchQueue, $object_data);
+                                       ActivityPub\Processor::updateItem($object_data);
                                } elseif (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) {
                                        ActivityPub\Processor::updatePerson($object_data);
                                } elseif (in_array($object_data['object_type'], ['pt:CacheFile'])) {
                                        // Unhandled Peertube activity
-                                       self::removeFromQueue($object_data);
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
@@ -720,8 +699,9 @@ class Receiver
                                        ActivityPub\Processor::deletePerson($object_data);
                                } elseif ($object_data['object_type'] == '') {
                                        // The object type couldn't be determined. Most likely we don't have it here. We ignore this activity.
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
@@ -729,7 +709,7 @@ class Receiver
                                if (in_array($object_data['object_type'], self::ACCOUNT_TYPES)) {
                                        ActivityPub\Processor::blockAccount($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
@@ -738,8 +718,9 @@ class Receiver
                                        ActivityPub\Processor::removeFromFeaturedCollection($object_data);                                      
                                } elseif ($object_data['object_type'] == '') {
                                        // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
@@ -748,9 +729,9 @@ class Receiver
                                        ActivityPub\Processor::followUser($object_data);
                                } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
                                        $object_data['reply-to-id'] = $object_data['object_id'];
-                                       ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::FOLLOW);
+                                       ActivityPub\Processor::createActivity($object_data, Activity::FOLLOW);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
@@ -758,9 +739,9 @@ class Receiver
                                if ($object_data['object_type'] == 'as:Follow') {
                                        ActivityPub\Processor::acceptFollowUser($object_data);
                                } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
-                                       ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTEND);
+                                       ActivityPub\Processor::createActivity($object_data, Activity::ATTEND);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
@@ -768,9 +749,9 @@ class Receiver
                                if ($object_data['object_type'] == 'as:Follow') {
                                        ActivityPub\Processor::rejectFollowUser($object_data);
                                } elseif (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
-                                       ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::ATTENDNO);
+                                       ActivityPub\Processor::createActivity($object_data, Activity::ATTENDNO);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
@@ -793,42 +774,42 @@ class Receiver
                                } elseif (in_array($object_data['object_type'], array_merge(self::ACTIVITY_TYPES, ['as:Announce', 'as:Create', ''])) &&
                                        empty($object_data['object_object_type'])) {
                                        // We cannot detect the target object. So we can ignore it.
-                                       self::removeFromQueue($object_data);
                                } elseif (in_array($object_data['object_type'], ['as:Create']) &&
                                        in_array($object_data['object_object_type'], ['pt:CacheFile'])) {
                                        // Unhandled Peertube activity
-                                       self::removeFromQueue($object_data);
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
                        case 'as:View':
                                if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
-                                       ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::VIEW);
+                                       ActivityPub\Processor::createActivity($object_data, Activity::VIEW);
                                } elseif ($object_data['object_type'] == '') {
                                        // The object type couldn't be determined. Most likely we don't have it here. We ignore this activity.
-                                       self::removeFromQueue($object_data);
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
 
                        case 'litepub:EmojiReact':
                                if (in_array($object_data['object_type'], self::CONTENT_TYPES)) {
-                                       ActivityPub\Processor::createActivity($fetchQueue, $object_data, Activity::EMOJIREACT);
+                                       ActivityPub\Processor::createActivity($object_data, Activity::EMOJIREACT);
                                } elseif ($object_data['object_type'] == '') {
                                        // The object type couldn't be determined. We don't have it and we can't fetch it. We ignore this activity.
+                                       Queue::remove($object_data);
                                } else {
-                                       self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
+                                       return false;
                                }
                                break;
        
                        default:
                                Logger::info('Unknown activity: ' . $type . ' ' . $object_data['object_type']);
-                               self::storeUnhandledActivity(true, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
-                               break;
+                               return false;
                }
+               return true;
        }
 
        /**
@@ -847,11 +828,6 @@ class Receiver
         */
        private static function storeUnhandledActivity(bool $unknown, string $type, array $object_data, array $activity, string $body = '', int $uid = null, bool $trust_source = false, bool $push = false, array $signer = [])
        {
-               if (!DI::config()->get('debug', 'ap_log_unknown')) {
-                       self::removeFromQueue($activity);
-                       return;
-               }
-
                $file = ($unknown  ? 'unknown-' : 'unhandled-') . str_replace(':', '-', $type) . '-';
        
                if (!empty($object_data['object_type'])) {
diff --git a/src/Worker/FetchMissingActivity.php b/src/Worker/FetchMissingActivity.php
new file mode 100644 (file)
index 0000000..5aabe89
--- /dev/null
@@ -0,0 +1,40 @@
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Worker;
+
+use Friendica\Core\Logger;
+use Friendica\Protocol\ActivityPub;
+use Friendica\Protocol\ActivityPub\Receiver;
+
+class FetchMissingActivity
+{
+       /**
+        * Fetch missing activities
+        * @param string $url Contact URL
+        */
+       public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL)
+       {
+               Logger::info('Start fetching missing activity', ['url' => $url]);
+               $result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion);
+               Logger::info('Finished fetching missing activity', ['url' => $url, 'result' => $result]);
+       }
+}
index c1f1cfcbc1a538f47a9174793eb3d94dd70969a7..0db5681871a642fd1cab37b195f964cf04917dfa 100644 (file)
@@ -790,12 +790,14 @@ return [
                        "id" => ["type" => "int unsigned", "not null" => "1", "extra" => "auto_increment", "primary" => "1", "comment" => "sequential ID"],
                        "activity-id" => ["type" => "varbinary(255)", "comment" => "id of the incoming activity"],
                        "object-id" => ["type" => "varbinary(255)", "comment" => ""],
+                       "in-reply-to-id" => ["type" => "varbinary(255)", "comment" => ""],
                        "type" => ["type" => "varchar(64)", "comment" => "Type of the activity"],
                        "object-type" => ["type" => "varchar(64)", "comment" => "Type of the object activity"],
                        "object-object-type" => ["type" => "varchar(64)", "comment" => "Type of the object's object activity"],                 
                        "received" => ["type" => "datetime", "comment" => "Receiving date"],
                        "activity" => ["type" => "mediumtext", "comment" => "The JSON activity"],
                        "signer" => ["type" => "varchar(255)", "comment" => ""],
+                       "push" => ["type" => "boolean", "not null" => "1", "default" => "0", "comment" => ""],
                ],
                "indexes" => [
                        "PRIMARY" => ["id"],