From 13ade6a4e7937c1fa1a042eae4ac048b19b84406 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 12 Jun 2025 02:42:18 +0000 Subject: [PATCH] Jetstream: Only complete threads when the drift allows it --- src/Protocol/ATProtocol/Jetstream.php | 27 ++++++++++++++++++++++++--- src/Protocol/ATProtocol/Processor.php | 24 +++++++++++++----------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/Protocol/ATProtocol/Jetstream.php b/src/Protocol/ATProtocol/Jetstream.php index a4771cae44..128bf888c9 100755 --- a/src/Protocol/ATProtocol/Jetstream.php +++ b/src/Protocol/ATProtocol/Jetstream.php @@ -38,6 +38,23 @@ use stdClass; */ class Jetstream { + /** + * Maximum drift values in seconds for the threads completion. + * If the drift is higher than this value, only a few posts in a thread will be fetched. + */ + const MAX_DRIFT_THREAD_COMPLETION = 30; + /** + * Maximum drift values in seconds for the DID cap. + * If the drift is higher than this value, the number of DIDs will be capped. + */ + const MAX_DRIFT_DID_CAP = 60; + /** + * Maximum drift values in seconds for creating posts. + * If the drift is higher than this value, posts and reshares will not be created. + * The other collections will still be processed. + */ + const MAX_DRIFT_CREATE_POSTS = 1200; + private $uids = []; private $self = []; private $capped = false; @@ -362,7 +379,7 @@ class Jetstream $drift = max(0, round(time() - $data->time_us / 1000000)); $this->keyValue->set('jetstream_drift', $drift); - if ($drift > 60 && !$this->capped) { + if ($drift > self::MAX_DRIFT_DID_CAP && !$this->capped) { $this->capped = true; $this->setOptions(); $this->logger->notice('Drift is too high, dids will be capped'); @@ -389,7 +406,9 @@ class Jetstream break; case 'create': - $this->processor->createPost($data, $this->uids[$data->did] ?? [0], ($drift > 30)); + if ($drift < self::MAX_DRIFT_CREATE_POSTS) { + $this->processor->createPost($data, $this->uids[$data->did] ?? [0], ($drift > self::MAX_DRIFT_THREAD_COMPLETION)); + } break; default: @@ -413,7 +432,9 @@ class Jetstream break; case 'create': - $this->processor->createRepost($data, $this->uids[$data->did] ?? [0], ($drift > 30)); + if ($drift < self::MAX_DRIFT_CREATE_POSTS) { + $this->processor->createRepost($data, $this->uids[$data->did] ?? [0], ($drift > self::MAX_DRIFT_THREAD_COMPLETION)); + } break; default: diff --git a/src/Protocol/ATProtocol/Processor.php b/src/Protocol/ATProtocol/Processor.php index 1ac2a9f00c..e073057358 100755 --- a/src/Protocol/ATProtocol/Processor.php +++ b/src/Protocol/ATProtocol/Processor.php @@ -151,7 +151,7 @@ class Processor if (!empty($root)) { $item['parent-uri'] = $root; - $item['thr-parent'] = $this->fetchMissingPost($parent, $uid, Item::PR_FETCHED, $item['contact-id'], 0, $parent, false, Conversation::PARCEL_JETSTREAM); + $item['thr-parent'] = $this->fetchMissingPost($parent, $uid, Item::PR_FETCHED, $item['contact-id'], 0, $parent, !$dont_fetch, Conversation::PARCEL_JETSTREAM); $item['gravity'] = Item::GRAVITY_COMMENT; } else { $item['gravity'] = Item::GRAVITY_PARENT; @@ -201,7 +201,7 @@ class Processor $item['gravity'] = Item::GRAVITY_ACTIVITY; $item['body'] = $item['verb'] = Activity::ANNOUNCE; $item['thr-parent'] = $this->getUri($data->commit->record->subject); - $item['thr-parent'] = $this->fetchMissingPost($item['thr-parent'], 0, Item::PR_FETCHED, $item['contact-id'], 0, $item['thr-parent'], false, Conversation::PARCEL_JETSTREAM); + $item['thr-parent'] = $this->fetchMissingPost($item['thr-parent'], 0, Item::PR_FETCHED, $item['contact-id'], 0, $item['thr-parent'], !$dont_fetch, Conversation::PARCEL_JETSTREAM); $id = Item::insert($item); @@ -694,14 +694,14 @@ class Processor return $restrict ? Item::CANT_REPLY : null; } - public function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $always_fetch = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string + public function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $complete_thread = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string { $timestamp = microtime(true); $stamp = Strings::getRandomHex(30); $this->logger->debug('Fetch missing post', ['uri' => $uri, 'stamp' => $stamp]); $fetched_uri = $this->getPostUri($uri, $uid); - if (!$always_fetch && !empty($fetched_uri)) { + if (!$complete_thread && !empty($fetched_uri)) { return $fetched_uri; } @@ -735,13 +735,13 @@ class Processor $causer = Contact::getPublicContactId($causer, $uid); } - if (!empty($data->thread->parent)) { + if (!empty($data->thread->parent) && $complete_thread) { $parents = $this->fetchParents($data->thread->parent, $uid); if (!empty($parents)) { if ($data->thread->post->record->reply->root->uri != $parents[0]->uri) { $parent_uri = $this->getUri($data->thread->post->record->reply->root); - $this->fetchMissingPost($parent_uri, $uid, $post_reason, $causer, $level, $data->thread->post->record->reply->root->uri, false, $Protocol); + $this->fetchMissingPost($parent_uri, $uid, $post_reason, $causer, $level, $data->thread->post->record->reply->root->uri, $complete_thread, $Protocol); } } @@ -751,7 +751,7 @@ class Processor } } - $uri = $this->processThread($data->thread, $uid, $post_reason, $causer, $level, $Protocol); + $uri = $this->processThread($data->thread, $uid, $post_reason, $causer, $level, $Protocol, $complete_thread); if (microtime(true) - $timestamp > 2) { $this->logger->debug('Fetched and processed post', ['duration' => round(microtime(true) - $timestamp, 3), 'uri' => $uri, 'stamp' => $stamp]); } @@ -771,7 +771,7 @@ class Processor return $parents; } - private function processThread(stdClass $thread, int $uid, int $post_reason, int $causer, int $level, int $protocol): string + private function processThread(stdClass $thread, int $uid, int $post_reason, int $causer, int $level, int $protocol, bool $complete_thread): string { if (empty($thread->post)) { $this->logger->info('Invalid post', ['post' => $thread]); @@ -794,9 +794,11 @@ class Processor $uri = $fetched_uri; } - foreach ($thread->replies ?? [] as $reply) { - $reply_uri = $this->processThread($reply, $uid, Item::PR_FETCHED, $causer, $level, $protocol); - $this->logger->debug('Reply has been processed', ['uri' => $uri, 'reply' => $reply_uri]); + if ($complete_thread) { + foreach ($thread->replies ?? [] as $reply) { + $reply_uri = $this->processThread($reply, $uid, Item::PR_FETCHED, $causer, $level, $protocol, $complete_thread); + $this->logger->debug('Reply has been processed', ['uri' => $uri, 'reply' => $reply_uri]); + } } return $uri; -- 2.39.5