]> git.mxchange.org Git - friendica.git/commitdiff
Decouple the processor from the receiver
authorMichael <heluecht@pirati.ca>
Wed, 27 Jul 2022 20:59:42 +0000 (20:59 +0000)
committerMichael <heluecht@pirati.ca>
Wed, 27 Jul 2022 20:59:42 +0000 (20:59 +0000)
src/Protocol/ActivityPub/Processor.php
src/Protocol/ActivityPub/Queue.php
src/Protocol/ActivityPub/Receiver.php
src/Worker/ProcessQueue.php [new file with mode: 0644]

index 30f6627ba33f77b91290597e08b06f43fea05249..2025aba04ef83366aeae16b8f0c7ea7f530d25cf 100644 (file)
@@ -982,7 +982,6 @@ class Processor
                        Queue::remove($activity);
 
                        if (Queue::hasChildren($item['uri'])) {
-                               //Queue::processReplyByUri($item['uri']);
                                Worker::add(PRIORITY_HIGH, 'ProcessReplyByUri', $item['uri']);
                        }
                }
index b389626a3ec0a5bcc365b32c70eb05505d9959da..b8521325815fbe9919788e1e8cf54ff0b81336b0 100644 (file)
@@ -167,13 +167,14 @@ class Queue
         * Process the activity with the given id
         *
         * @param integer $id
-        * @return void
+        *
+        * @return bool
         */
-       public static function process(int $id)
+       public static function process(int $id): bool
        {
                $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
                if (empty($entry)) {
-                       return;
+                       return false;
                }
 
                Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
@@ -197,6 +198,8 @@ class Queue
                if (!Receiver::routeActivities($activity, $type, $push)) {
                        self::remove($activity);
                }
+
+               return true;
        }
 
        /**
index 9e7921699676172a8de596f8658f474612b12490..0515b24e7d8516ccf11520c4144f201eebd4456e 100644 (file)
@@ -28,6 +28,7 @@ use Friendica\Content\Text\Markdown;
 use Friendica\Core\Logger;
 use Friendica\Core\Protocol;
 use Friendica\Core\System;
+use Friendica\Core\Worker;
 use Friendica\DI;
 use Friendica\Model\Contact;
 use Friendica\Model\APContact;
@@ -36,6 +37,7 @@ use Friendica\Model\Post;
 use Friendica\Model\User;
 use Friendica\Protocol\Activity;
 use Friendica\Protocol\ActivityPub;
+use Friendica\Util\DateTimeFormat;
 use Friendica\Util\HTTPSignature;
 use Friendica\Util\JsonLD;
 use Friendica\Util\LDSignature;
@@ -597,6 +599,13 @@ class Receiver
                        return;
                }
 
+               if ($push) {
+                       // We delay by 5 seconds to allow to accumulate all receivers
+                       $delayed = date(DateTimeFormat::MYSQL, time() + 5);
+                       Worker::add(['priority' => PRIORITY_HIGH, 'delayed' => $delayed], 'ProcessQueue', $object_data['entry-id']);
+                       return;
+               }
+
                if (!empty($activity['recursion-depth'])) {
                        $object_data['recursion-depth'] = $activity['recursion-depth'];
                }
diff --git a/src/Worker/ProcessQueue.php b/src/Worker/ProcessQueue.php
new file mode 100644 (file)
index 0000000..dee24ee
--- /dev/null
@@ -0,0 +1,42 @@
+<?php
+/**
+ * @copyright Copyright (C) 2010-2022, the Friendica project
+ *
+ * @license GNU AGPL version 3 or any later version
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <https://www.gnu.org/licenses/>.
+ *
+ */
+
+namespace Friendica\Worker;
+
+use Friendica\Core\Logger;
+use Friendica\Protocol\ActivityPub\Queue;
+
+class ProcessQueue
+{
+       /**
+        * Process queue entry
+        *
+        * @param int $id queue id
+        *
+        * @return void
+        */
+       public static function execute(int $id)
+       {
+               Logger::info('Start processing queue entry', ['id' => $id]);
+               $result = Queue::process($id);
+               Logger::info('Successfully processed queue entry', ['result' => $result, 'id' => $id]);
+       }
+}