]> git.mxchange.org Git - friendica.git/blobdiff - src/Protocol/ActivityPub/Processor.php
Avoid to provess the same activity
[friendica.git] / src / Protocol / ActivityPub / Processor.php
index 60dd42cb49349915ef1baee30a72cd6286689398..92aa3b783334decb05b2aee759d66bd595d57f79 100644 (file)
@@ -58,6 +58,24 @@ use Friendica\Worker\Delivery;
 class Processor
 {
        const CACHEKEY_FETCH_ACTIVITY = 'processor:fetchMissingActivity:';
+       const CACHEKEY_JUST_FETCHED   = 'processor:isJustFetched:';
+
+       static $processed = [];
+
+       public static function addActivityId(string $id)
+       {
+               self::$processed[] = $id;
+               if (count(self::$processed) > 100) {
+                       self::$processed = array_slice(self::$processed, 1);
+               }
+               print_r(self::$processed);
+       }
+
+       public static function isProcessed(string $id): bool
+       {
+               return in_array($id, self::$processed);
+       }
+
        /**
         * Extracts the tag character (#, @, !) from mention links
         *
@@ -215,14 +233,13 @@ class Processor
                $item['edited'] = DateTimeFormat::utc($activity['updated']);
 
                $item = self::processContent($activity, $item);
-
-               self::storeAttachments($activity, $item);
-               self::storeQuestion($activity, $item);
-
                if (empty($item)) {
                        return;
                }
 
+               self::storeAttachments($activity, $item);
+               self::storeQuestion($activity, $item);
+
                Post\History::add($item['uri-id'], $item);
                Item::update($item, ['uri' => $activity['id']]);
 
@@ -265,13 +282,23 @@ class Processor
        /**
         * Prepares data for a message
         *
-        * @param array      $activity   Activity array
+        * @param array $activity      Activity array
+        * @param bool  $fetch_parents
+        *
         * @return array Internal item
+        *
         * @throws \Friendica\Network\HTTPException\InternalServerErrorException
         * @throws \ImagickException
         */
-       public static function createItem(array $activity): array
+       public static function createItem(array $activity, bool $fetch_parents = true): array
        {
+               if (self::isProcessed($activity['id'])) {
+                       Logger::info('Id is already processed', ['id' => $activity['id']]);
+                       return [];
+               }
+
+               self::addActivityId($activity['id']);
+
                $item = [];
                $item['verb'] = Activity::POST;
                $item['thr-parent'] = $activity['reply-to-id'];
@@ -300,42 +327,23 @@ class Processor
                        $conversation = [];
                }
 
-               if (empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
-                       $recursion_depth = $activity['recursion-depth'] ?? 0;
-                       Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
-                       if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) {
-                               $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
-                               if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
-                                       // Recursively delete this and all depending entries
-                                       Queue::deleteById($activity['entry-id']);
-                                       return [];
-                               }
-                               $fetch_by_worker = empty($result);
-                       } else {
-                               Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
-                               $fetch_by_worker = true;
-                       }
+               if (empty($activity['author']) && empty($activity['actor'])) {
+                       Logger::notice('Missing author and actor. We quit here.', ['activity' => $activity]);
+                       return [];
+               }
 
-                       if ($fetch_by_worker && Queue::hasWorker($activity)) {
-                               Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]);
-                               $fetch_by_worker = false;
-                               if (!empty($conversation)) {
-                                       return [];
-                               }
-                       }
+               if (!DI::config()->get('system', 'fetch_parents')) {
+                       $fetch_parents = false;
+               }
 
-                       if ($fetch_by_worker) {
-                               Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
-                               $activity['recursion-depth'] = 0;
-                               $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
-                               Queue::setWorkerId($activity, $wid);
-                               if (!empty($conversation)) {
-                                       return [];
-                               }
-                       } elseif (!empty($result)) {
+               if ($fetch_parents && empty($activity['directmessage']) && ($activity['id'] != $activity['reply-to-id']) && !Post::exists(['uri' => $activity['reply-to-id']])) {
+                       $result = self::fetchParent($activity);
+                       if (!empty($result)) {
                                if (($item['thr-parent'] != $result) && Post::exists(['uri' => $result])) {
                                        $item['thr-parent'] = $result;
                                }
+                       } elseif (empty($conversation)) {
+                               return [];
                        }
                }
 
@@ -362,11 +370,12 @@ class Processor
 
                if (!empty($activity['raw'])) {
                        $item['source'] = $activity['raw'];
-                       $item['protocol'] = Conversation::PARCEL_ACTIVITYPUB;
+               }
 
-                       if (isset($activity['push'])) {
-                               $item['direction'] = $activity['push'] ? Conversation::PUSH : Conversation::PULL;
-                       }
+               $item['protocol'] = Conversation::PARCEL_ACTIVITYPUB;
+
+               if (isset($activity['push'])) {
+                       $item['direction'] = $activity['push'] ? Conversation::PUSH : Conversation::PULL;
                }
 
                if (!empty($activity['from-relay'])) {
@@ -459,6 +468,94 @@ class Processor
                return $item;
        }
 
+       /**
+        * Fetch and process parent posts for the given activity
+        *
+        * @param array $activity
+        *
+        * @return string
+        */
+       private static function fetchParent(array $activity): string
+       {
+               if (self::hasJustBeenFetched($activity['reply-to-id'])) {
+                       Logger::notice('We just have tried to fetch this activity. We don\'t try it again.', ['parent' => $activity['reply-to-id']]);
+                       return '';
+               } 
+
+               $recursion_depth = $activity['recursion-depth'] ?? 0;
+
+               if ($recursion_depth < DI::config()->get('system', 'max_recursion_depth')) {
+                       Logger::notice('Parent not found. Try to refetch it.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
+                       $result = self::fetchMissingActivity($activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
+                       if (empty($result) && self::isActivityGone($activity['reply-to-id'])) {
+                               Logger::notice('The activity is gone, the queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
+                               if (!empty($activity['entry-id'])) {
+                                       Queue::deleteById($activity['entry-id']);
+                               }
+                               return '';
+                       } elseif (!empty($result)) {
+                               $exists = Post::exists(['uri' => [$result, $activity['reply-to-id']]]);
+                               if ($exists) {
+                                       Logger::notice('The activity has been fetched and created.', ['parent' => $result]);
+                                       return $result;
+                               } elseif (DI::config()->get('system', 'fetch_by_worker') || DI::config()->get('system', 'decoupled_receiver')) {
+                                       Logger::notice('The activity has been fetched and will hopefully be created later.', ['parent' => $result]);
+                               } else {
+                                       Logger::notice('The activity exists but has not been created, the queue entry will be deleted.', ['parent' => $result]);
+                                       if (!empty($activity['entry-id'])) {
+                                               Queue::deleteById($activity['entry-id']);
+                                       }
+                               }
+                               return '';
+                       }
+                       if (empty($result) && !DI::config()->get('system', 'fetch_by_worker')) {
+                               return '';
+                       }
+               } elseif (self::isActivityGone($activity['reply-to-id'])) {
+                       Logger::notice('The activity is gone. We will not spawn a worker. The queue entry will be deleted', ['parent' => $activity['reply-to-id']]);
+                       if (!empty($activity['entry-id'])) {
+                               Queue::deleteById($activity['entry-id']);
+                       }
+                       return '';
+               } else {
+                       Logger::notice('Recursion level is too high.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
+               }
+
+               if (Queue::hasWorker($activity['worker-id'] ?? 0)) {
+                       Logger::notice('There is already a worker task to fetch the post.', ['id' => $activity['id'], 'parent' => $activity['reply-to-id']]);
+                       return '';
+               }
+
+               if (!Fetch::hasWorker($activity['reply-to-id'])) {
+                       Logger::notice('Fetching is done by worker.', ['parent' => $activity['reply-to-id'], 'recursion-depth' => $recursion_depth]);
+                       Fetch::add($activity['reply-to-id']);
+                       $activity['recursion-depth'] = 0;
+                       $wid = Worker::add(PRIORITY_HIGH, 'FetchMissingActivity', $activity['reply-to-id'], $activity, '', Receiver::COMPLETION_AUTO);
+                       Fetch::setWorkerId($activity['reply-to-id'], $wid);
+               } else {
+                       Logger::debug('Activity will already be fetched via a worker.', ['url' => $activity['reply-to-id']]);
+               }
+
+               return '';
+       }
+
+       /**
+        * Check if a given activity has recently been fetched
+        *
+        * @param string $url
+        * @return boolean
+        */
+       private static function hasJustBeenFetched(string $url): bool
+       {
+               $cachekey = self::CACHEKEY_JUST_FETCHED . $url;
+               $time = DI::cache()->get($cachekey);
+               if (is_null($time)) {
+                       DI::cache()->set($cachekey, time(), Duration::FIVE_MINUTES);
+                       return false;
+               }
+               return ($time + 300) > time();
+       }
+
        /**
         * Check if a given activity is no longer available
         *
@@ -900,6 +997,8 @@ class Processor
                                $item['post-reason'] = Item::PR_RELAY;
                        } elseif (!empty($activity['thread-completion'])) {
                                $item['post-reason'] = Item::PR_FETCHED;
+                       } elseif (in_array($item['post-reason'], [Item::PR_GLOBAL, Item::PR_NONE]) && !empty($activity['push'])) {
+                               $item['post-reason'] = Item::PR_PUSHED;
                        }
 
                        if ($item['isForum'] ?? false) {
@@ -970,9 +1069,6 @@ class Processor
                                $success = true;
                        } else {
                                Logger::notice('Item insertion aborted', ['uri' => $item['uri'], 'uid' => $item['uid']]);
-                               if (Item::isTooOld($item) || !Item::isValid($item)) {
-                                       Queue::remove($activity);
-                               }
                        }
 
                        if ($item['uid'] == 0) {
@@ -980,12 +1076,10 @@ class Processor
                        }
                }
 
-               if ($success) {
-                       Queue::remove($activity);
+               Queue::remove($activity);
 
-                       if (Queue::hasChildren($item['uri'])) {
-                               Worker::add(PRIORITY_HIGH, 'ProcessReplyByUri', $item['uri']);
-                       }
+               if ($success && Queue::hasChildren($item['uri']) && Post::exists(['uri' => $item['uri']])) {
+                       Queue::processReplyByUri($item['uri']);
                }
 
                // Store send a follow request for every reshare - but only when the item had been stored
@@ -1329,9 +1423,13 @@ class Processor
                        return '';
                }
 
-               ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer);
-
-               Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'object' => $activity['id']]);
+               if (($completion == Receiver::COMPLETION_RELAY) && Queue::exists($url, 'as:Create')) {
+                       Logger::notice('Activity has already been queued.', ['url' => $url, 'object' => $activity['id']]);
+               } elseif (ActivityPub\Receiver::processActivity($ldactivity, json_encode($activity), $uid, true, false, $signer, '', $completion)) {
+                       Logger::notice('Activity had been fetched and processed.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]);
+               } else {
+                       Logger::notice('Activity had been fetched and will be processed later.', ['url' => $url, 'entry' => $child['entry-id'] ?? 0, 'completion' => $completion, 'object' => $activity['id']]);
+               }
 
                return $activity['id'];
        }