]> git.mxchange.org Git - friendica.git/commitdiff
Recursively delete failed worker tasks
authorMichael <heluecht@pirati.ca>
Sun, 24 Jul 2022 09:26:52 +0000 (09:26 +0000)
committerMichael <heluecht@pirati.ca>
Sun, 24 Jul 2022 09:26:52 +0000 (09:26 +0000)
src/Protocol/ActivityPub/Processor.php
src/Protocol/ActivityPub/Queue.php
src/Protocol/ActivityPub/Receiver.php
src/Worker/Cron.php
src/Worker/FetchMissingActivity.php

index aa169ab028b31c508f06b27da8d60de08ed1e6eb..9d5f3f2f6e8150905758acad2099e6d0f875814f 100644 (file)
@@ -551,16 +551,13 @@ class Processor
                        }
                }
 
-               if ($activity['target_id'] != $actor['featured']) {
-                       return null;
-               }
-
-               $id = Contact::getIdForURL($activity['actor']);
-               if (empty($id)) {
-                       return null;
+               $parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]);
+               if (empty($parent['uri-id'])) {
+                       if (self::fetchMissingActivity($activity['object_id'], $activity, '', Receiver::COMPLETION_AUTO)) {
+                               $parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]);
+                       }
                }
 
-               $parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id'], 'author-id' => $id]);
                if (!empty($parent['uri-id'])) {
                        return $parent['uri-id'];
                }
@@ -1191,20 +1188,27 @@ class Processor
                        return '';
                }
 
-               if (!empty($object['actor'])) {
-                       $object_actor = $object['actor'];
-               } elseif (!empty($object['attributedTo'])) {
-                       $object_actor = $object['attributedTo'];
-                       if (is_array($object_actor)) {
+               $signer = [];
+
+               if (!empty($object['attributedTo'])) {
+                       $attributed_to = $object['attributedTo'];
+                       if (is_array($attributed_to)) {
                                $compacted = JsonLD::compact($object);
-                               $object_actor = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id');
+                               $attributed_to = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id');
                        }
+                       $signer[] = $attributed_to;     
+               }
+
+               if (!empty($object['actor'])) {
+                       $object_actor = $object['actor'];
+               } elseif (!empty($attributed_to)) {
+                       $object_actor = $attributed_to;
                } else {
                        // Shouldn't happen
                        $object_actor = '';
                }
 
-               $signer = [$object_actor];
+               $signer[] = $object_actor;
 
                if (!empty($child['author'])) {
                        $actor = $child['author'];
index d150f9f9cc439f69dd4278927ec70bb15b15f9bd..3d40d71638fb64646f0f42acbbaef5fe3ff33e37 100644 (file)
@@ -95,6 +95,42 @@ class Queue
                DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
        }
 
+       /**
+        * Delete all entries that depend on the given worker id
+        *
+        * @param integer $wid
+        * @return void
+        */
+       public static function deleteByWorkerId(int $wid)
+       {
+               $entries = DBA::select('inbox-entry', ['id'], ['wid' => $wid]);
+               while ($entry = DBA::fetch($entries)) {
+                       self::deleteById($entry['id']);
+               }
+               DBA::close($entries);
+       }
+
+       /**
+        * Delete recursively an entry and all their children
+        *
+        * @param integer $id
+        * @return void
+        */
+       private static function deleteById(int $id)
+       {
+               $entry = DBA::selectFirst('inbox-entry', ['id', 'object-id'], ['id' => $id]);
+               if (empty($entry)) {
+                       return;
+               }
+
+               $children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]);
+               while ($child = DBA::fetch($children)) {
+                       self::deleteById($child['id']);
+               }
+               DBA::close($children);
+               DBA::delete('inbox-entry', ['id' => $entry['id']]);
+       }
+
        /**
         * Set the worker id for the queue entry
         *
@@ -143,8 +179,9 @@ class Queue
                $type     = $entry['type'];
                $push     = $entry['push'];
 
-               $activity['entry-id']  = $entry['id'];
-               $activity['worker-id'] = $entry['wid'];
+               $activity['entry-id']        = $entry['id'];
+               $activity['worker-id']       = $entry['wid'];
+               $activity['recursion-depth'] = 0;
 
                $receivers = DBA::select('inbox-entry-receiver', ['uid'], ['queue-id' => $entry['id']]);
                while ($receiver = DBA::fetch($receivers)) {
@@ -166,8 +203,13 @@ class Queue
         */
        public static function processAll()
        {
-               $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]);
+               $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id'], ["`wid` IS NULL"], ['order' => ['id' => true]]);
                while ($entry = DBA::fetch($entries)) {
+                       // We don't need to process entries that depend on already existing entries.
+                       if (!empty($entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
+                               continue;
+                       }
+                       Logger::debug('Process leftover entry', $entry);
                        self::process($entry['id']);
                }
        }
index 92ed31922bf991d10f7ab806591080cbd23afd7b..3e345107496b26404b0ad77bc4566888421d4bf1 100644 (file)
@@ -517,10 +517,6 @@ class Receiver
                        }
                }
 
-               if (($type == 'as:Add') && is_array($activity['as:object']) && (count($activity['as:object']) == 1)) {
-                       $trust_source = false;
-               }
-
                // $trust_source is called by reference and is set to true if the content was retrieved successfully
                $object_data = self::prepareObjectData($activity, $uid, $push, $trust_source);
                if (empty($object_data)) {
@@ -556,10 +552,6 @@ class Receiver
                        $object_data['thread-children-type'] = $activity['thread-children-type'];
                }
 
-               if (!empty($activity['recursion-depth'])) {
-                       $object_data['recursion-depth'] = $activity['recursion-depth'];
-               }
-
                // Internal flag for posts that arrived via relay
                if (!empty($activity['from-relay'])) {
                        $object_data['from-relay'] = $activity['from-relay'];
@@ -571,6 +563,10 @@ class Receiver
 
                $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push);
 
+               if (!empty($activity['recursion-depth'])) {
+                       $object_data['recursion-depth'] = $activity['recursion-depth'];
+               }
+
                if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) {
                        self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
                }
index 68ad2180145b6be648cc64e5bc013c11c5a5b1ec..c2109e66a3ea4bce82de5f568f4fa9c630a834be 100644 (file)
@@ -128,11 +128,14 @@ class Cron
                        if (DI::config()->get('system', 'optimize_tables')) {
                                Worker::add(PRIORITY_LOW, 'OptimizeTables');
                        }
-       
-                       DI::config()->set('system', 'last_cron_daily', time());
+
+                       // Process all unprocessed entries
+                       Queue::processAll();
 
                        // Resubscribe to relay servers
                        Relay::reSubscribe();
+
+                       DI::config()->set('system', 'last_cron_daily', time());
                }
 
                Logger::notice('end');
index ae0f8a7fbf167d8da167b5372aff5b680e1dca72..91b473a6ef81d02a6534ea69f92d1220015a923b 100644 (file)
@@ -23,6 +23,7 @@ namespace Friendica\Worker;
 
 use Friendica\Core\Logger;
 use Friendica\Core\Worker;
+use Friendica\DI;
 use Friendica\Protocol\ActivityPub;
 use Friendica\Protocol\ActivityPub\Queue;
 use Friendica\Protocol\ActivityPub\Receiver;
@@ -32,6 +33,8 @@ class FetchMissingActivity
        /**
         * Fetch missing activities
         * @param string $url Contact URL
+        * 
+        * @return void
         */
        public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL)
        {
@@ -39,10 +42,14 @@ class FetchMissingActivity
                $result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion);
                if ($result) {
                        Logger::info('Successfully fetched missing activity', ['url' => $url]);
-                       Queue::processReplyByUri($url);
                } elseif (!Worker::defer()) {
-                       // @todo perform recursive deletion of all entries
                        Logger::info('Activity could not be fetched', ['url' => $url]);
+
+                       // recursively delete all entries that belong to this worker task
+                       $queue = DI::app()->getQueue();
+                       if (!empty($queue['id'])) {
+                               Queue::deleteByWorkerId($queue['id']);
+                       }
                } else {
                        Logger::info('Fetching deferred', ['url' => $url]);
                }