*/
class Jetstream
{
+ /**
+ * Maximum drift values in seconds for the threads completion.
+ * If the drift is higher than this value, only a few posts in a thread will be fetched.
+ */
+ const MAX_DRIFT_THREAD_COMPLETION = 30;
+ /**
+ * Maximum drift values in seconds for the DID cap.
+ * If the drift is higher than this value, the number of DIDs will be capped.
+ */
+ const MAX_DRIFT_DID_CAP = 60;
+ /**
+ * Maximum drift values in seconds for creating posts.
+ * If the drift is higher than this value, posts and reshares will not be created.
+ * The other collections will still be processed.
+ */
+ const MAX_DRIFT_CREATE_POSTS = 1200;
+
private $uids = [];
private $self = [];
private $capped = false;
$drift = max(0, round(time() - $data->time_us / 1000000));
$this->keyValue->set('jetstream_drift', $drift);
- if ($drift > 60 && !$this->capped) {
+ if ($drift > self::MAX_DRIFT_DID_CAP && !$this->capped) {
$this->capped = true;
$this->setOptions();
$this->logger->notice('Drift is too high, dids will be capped');
break;
case 'create':
- $this->processor->createPost($data, $this->uids[$data->did] ?? [0], ($drift > 30));
+ if ($drift < self::MAX_DRIFT_CREATE_POSTS) {
+ $this->processor->createPost($data, $this->uids[$data->did] ?? [0], ($drift > self::MAX_DRIFT_THREAD_COMPLETION));
+ }
break;
default:
break;
case 'create':
- $this->processor->createRepost($data, $this->uids[$data->did] ?? [0], ($drift > 30));
+ if ($drift < self::MAX_DRIFT_CREATE_POSTS) {
+ $this->processor->createRepost($data, $this->uids[$data->did] ?? [0], ($drift > self::MAX_DRIFT_THREAD_COMPLETION));
+ }
break;
default:
if (!empty($root)) {
$item['parent-uri'] = $root;
- $item['thr-parent'] = $this->fetchMissingPost($parent, $uid, Item::PR_FETCHED, $item['contact-id'], 0, $parent, false, Conversation::PARCEL_JETSTREAM);
+ $item['thr-parent'] = $this->fetchMissingPost($parent, $uid, Item::PR_FETCHED, $item['contact-id'], 0, $parent, !$dont_fetch, Conversation::PARCEL_JETSTREAM);
$item['gravity'] = Item::GRAVITY_COMMENT;
} else {
$item['gravity'] = Item::GRAVITY_PARENT;
$item['gravity'] = Item::GRAVITY_ACTIVITY;
$item['body'] = $item['verb'] = Activity::ANNOUNCE;
$item['thr-parent'] = $this->getUri($data->commit->record->subject);
- $item['thr-parent'] = $this->fetchMissingPost($item['thr-parent'], 0, Item::PR_FETCHED, $item['contact-id'], 0, $item['thr-parent'], false, Conversation::PARCEL_JETSTREAM);
+ $item['thr-parent'] = $this->fetchMissingPost($item['thr-parent'], 0, Item::PR_FETCHED, $item['contact-id'], 0, $item['thr-parent'], !$dont_fetch, Conversation::PARCEL_JETSTREAM);
$id = Item::insert($item);
return $restrict ? Item::CANT_REPLY : null;
}
- public function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $always_fetch = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string
+ public function fetchMissingPost(string $uri, int $uid, int $post_reason, int $causer, int $level, string $fallback = '', bool $complete_thread = false, int $Protocol = Conversation::PARCEL_JETSTREAM): string
{
$timestamp = microtime(true);
$stamp = Strings::getRandomHex(30);
$this->logger->debug('Fetch missing post', ['uri' => $uri, 'stamp' => $stamp]);
$fetched_uri = $this->getPostUri($uri, $uid);
- if (!$always_fetch && !empty($fetched_uri)) {
+ if (!$complete_thread && !empty($fetched_uri)) {
return $fetched_uri;
}
$causer = Contact::getPublicContactId($causer, $uid);
}
- if (!empty($data->thread->parent)) {
+ if (!empty($data->thread->parent) && $complete_thread) {
$parents = $this->fetchParents($data->thread->parent, $uid);
if (!empty($parents)) {
if ($data->thread->post->record->reply->root->uri != $parents[0]->uri) {
$parent_uri = $this->getUri($data->thread->post->record->reply->root);
- $this->fetchMissingPost($parent_uri, $uid, $post_reason, $causer, $level, $data->thread->post->record->reply->root->uri, false, $Protocol);
+ $this->fetchMissingPost($parent_uri, $uid, $post_reason, $causer, $level, $data->thread->post->record->reply->root->uri, $complete_thread, $Protocol);
}
}
}
}
- $uri = $this->processThread($data->thread, $uid, $post_reason, $causer, $level, $Protocol);
+ $uri = $this->processThread($data->thread, $uid, $post_reason, $causer, $level, $Protocol, $complete_thread);
if (microtime(true) - $timestamp > 2) {
$this->logger->debug('Fetched and processed post', ['duration' => round(microtime(true) - $timestamp, 3), 'uri' => $uri, 'stamp' => $stamp]);
}
return $parents;
}
- private function processThread(stdClass $thread, int $uid, int $post_reason, int $causer, int $level, int $protocol): string
+ private function processThread(stdClass $thread, int $uid, int $post_reason, int $causer, int $level, int $protocol, bool $complete_thread): string
{
if (empty($thread->post)) {
$this->logger->info('Invalid post', ['post' => $thread]);
$uri = $fetched_uri;
}
- foreach ($thread->replies ?? [] as $reply) {
- $reply_uri = $this->processThread($reply, $uid, Item::PR_FETCHED, $causer, $level, $protocol);
- $this->logger->debug('Reply has been processed', ['uri' => $uri, 'reply' => $reply_uri]);
+ if ($complete_thread) {
+ foreach ($thread->replies ?? [] as $reply) {
+ $reply_uri = $this->processThread($reply, $uid, Item::PR_FETCHED, $causer, $level, $protocol, $complete_thread);
+ $this->logger->debug('Reply has been processed', ['uri' => $uri, 'reply' => $reply_uri]);
+ }
}
return $uri;