]> git.mxchange.org Git - friendica.git/commitdiff
Jetstream: Only complete threads when the drift allows it
authorMichael <heluecht@pirati.ca>
Thu, 12 Jun 2025 02:42:18 +0000 (02:42 +0000)
committerMichael <heluecht@pirati.ca>
Sun, 15 Jun 2025 08:39:11 +0000 (08:39 +0000)
src/Protocol/ATProtocol/Jetstream.php
src/Protocol/ATProtocol/Processor.php

index a4771cae44146e2ec7ed9700db0a389f5b7ca3ed..128bf888c9557ae8b2dc89632c9d9c3b6f860a8e 100755 (executable)
@@ -38,6 +38,23 @@ use stdClass;
  */
 class Jetstream
 {
+       /** 
+        * Maximum drift values in seconds for the threads completion.
+        * If the drift is higher than this value, only a few posts in a thread will be fetched.
+        */
+       const MAX_DRIFT_THREAD_COMPLETION = 30;
+       /**
+        * Maximum drift values in seconds for the DID cap.
+        * If the drift is higher than this value, the number of DIDs will be capped.
+        */
+       const MAX_DRIFT_DID_CAP           = 60;
+       /**
+        * Maximum drift values in seconds for creating posts.
+        * If the drift is higher than this value, posts and reshares will not be created.
+        * The other collections will still be processed.
+        */
+       const MAX_DRIFT_CREATE_POSTS      = 1200;
+
        private $uids   = [];
        private $self   = [];
        private $capped = false;
@@ -362,7 +379,7 @@ class Jetstream
                $drift = max(0, round(time() - $data->time_us / 1000000));
                $this->keyValue->set('jetstream_drift', $drift);
 
-               if ($drift > 60 && !$this->capped) {
+               if ($drift > self::MAX_DRIFT_DID_CAP && !$this->capped) {
                        $this->capped = true;
                        $this->setOptions();
                        $this->logger->notice('Drift is too high, dids will be capped');
@@ -389,7 +406,9 @@ class Jetstream
                                break;
 
                        case 'create':
-                               $this->processor->createPost($data, $this->uids[$data->did] ?? [0], ($drift > 30));
+                               if ($drift < self::MAX_DRIFT_CREATE_POSTS) {
+                                       $this->processor->createPost($data, $this->uids[$data->did] ?? [0], ($drift > self::MAX_DRIFT_THREAD_COMPLETION));
+                               }
                                break;
 
                        default:
@@ -413,7 +432,9 @@ class Jetstream
                                break;
 
                        case 'create':
-                               $this->processor->createRepost($data, $this->uids[$data->did] ?? [0], ($drift > 30));
+                               if ($drift < self::MAX_DRIFT_CREATE_POSTS) {
+                                       $this->processor->createRepost($data, $this->uids[$data->did] ?? [0], ($drift > self::MAX_DRIFT_THREAD_COMPLETION));
+                               }
                                break;
 
                        default:
index 1ac2a9f00cce820e8c13c79bc31ff7401b323a8a..e073057358461bfd0c0e5cd5d7c71b72377466aa 100755 (executable)
@@ -151,7 +151,7 @@ class Processor
 
                        if (!empty($root)) {
                                $item['parent-uri'] = $root;
-                               $item['thr-parent'] = $this->fetchMissingPost($parent, $uid, Item::PR_FETCHED, $item['contact-id'], 0, $parent, false, Conversation::PARCEL_JETSTREAM);
+                               $item['thr-parent'] = $this->fetchMissingPost($parent, $uid, Item::PR_FETCHED, $item['contact-id'], 0, $parent, !$dont_fetch, Conversation::PARCEL_JETSTREAM);
                                $item['gravity']    = Item::GRAVITY_COMMENT;
                        } else {
                                $item['gravity'] = Item::GRAVITY_PARENT;
@@ -201,7 +201,7 @@ class Processor
                        $item['gravity']    = Item::GRAVITY_ACTIVITY;
                        $item['body']       = $item['verb'] = Activity::ANNOUNCE;
                        $item['thr-parent'] = $this->getUri($data->commit->record->subject);
-                       $item['thr-parent'] = $this->fetchMissingPost($item['thr-parent'], 0, Item::PR_FETCHED, $item['contact-id'], 0, $item['thr-parent'], false, Conversation::PARCEL_JETSTREAM);
+                       $item['thr-parent'] = $this->fetchMissingPost($item['thr-parent'], 0, Item::PR_FETCHED, $item['contact-id'], 0, $item['thr-parent'], !$dont_fetch, Conversation::PARCEL_JETSTREAM);
 
                        $id = Item::insert($item);
 
@@ -694,14 +694,14 @@ class Processor
                return $restrict ? Item::CANT_REPLY : null;
        }
 
-       public function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $always_fetch = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string
+       public function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $complete_thread = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string
        {
                $timestamp = microtime(true);
                $stamp     = Strings::getRandomHex(30);
                $this->logger->debug('Fetch missing post', ['uri' => $uri, 'stamp' => $stamp]);
 
                $fetched_uri = $this->getPostUri($uri, $uid);
-               if (!$always_fetch && !empty($fetched_uri)) {
+               if (!$complete_thread && !empty($fetched_uri)) {
                        return $fetched_uri;
                }
 
@@ -735,13 +735,13 @@ class Processor
                        $causer = Contact::getPublicContactId($causer, $uid);
                }
 
-               if (!empty($data->thread->parent)) {
+               if (!empty($data->thread->parent) && $complete_thread) {
                        $parents = $this->fetchParents($data->thread->parent, $uid);
 
                        if (!empty($parents)) {
                                if ($data->thread->post->record->reply->root->uri != $parents[0]->uri) {
                                        $parent_uri = $this->getUri($data->thread->post->record->reply->root);
-                                       $this->fetchMissingPost($parent_uri, $uid, $post_reason, $causer, $level, $data->thread->post->record->reply->root->uri, false, $Protocol);
+                                       $this->fetchMissingPost($parent_uri, $uid, $post_reason, $causer, $level, $data->thread->post->record->reply->root->uri, $complete_thread, $Protocol);
                                }
                        }
 
@@ -751,7 +751,7 @@ class Processor
                        }
                }
 
-               $uri = $this->processThread($data->thread, $uid, $post_reason, $causer, $level, $Protocol);
+               $uri = $this->processThread($data->thread, $uid, $post_reason, $causer, $level, $Protocol, $complete_thread);
                if (microtime(true) - $timestamp > 2) {
                        $this->logger->debug('Fetched and processed post', ['duration' => round(microtime(true) - $timestamp, 3), 'uri' => $uri, 'stamp' => $stamp]);
                }
@@ -771,7 +771,7 @@ class Processor
                return $parents;
        }
 
-       private function processThread(stdClass $thread, int $uid, int $post_reason, int $causer, int $level, int $protocol): string
+       private function processThread(stdClass $thread, int $uid, int $post_reason, int $causer, int $level, int $protocol, bool $complete_thread): string
        {
                if (empty($thread->post)) {
                        $this->logger->info('Invalid post', ['post' => $thread]);
@@ -794,9 +794,11 @@ class Processor
                        $uri = $fetched_uri;
                }
 
-               foreach ($thread->replies ?? [] as $reply) {
-                       $reply_uri = $this->processThread($reply, $uid, Item::PR_FETCHED, $causer, $level, $protocol);
-                       $this->logger->debug('Reply has been processed', ['uri' => $uri, 'reply' => $reply_uri]);
+               if ($complete_thread) {
+                       foreach ($thread->replies ?? [] as $reply) {
+                               $reply_uri = $this->processThread($reply, $uid, Item::PR_FETCHED, $causer, $level, $protocol, $complete_thread);
+                               $this->logger->debug('Reply has been processed', ['uri' => $uri, 'reply' => $reply_uri]);
+                       }
                }
 
                return $uri;