<?php
/**
- * @copyright Copyright (C) 2010-2022, the Friendica project
+ * @copyright Copyright (C) 2010-2023, the Friendica project
*
* @license GNU AGPL version 3 or any later version
*
namespace Friendica\Protocol\ActivityPub;
use Friendica\Core\Logger;
+use Friendica\Core\System;
use Friendica\Database\Database;
use Friendica\Database\DBA;
use Friendica\DI;
+use Friendica\Model\ItemURI;
use Friendica\Model\Post;
use Friendica\Util\DateTimeFormat;
use Friendica\Util\JsonLD;
return;
}
+ Logger::debug('Delete inbox-entry', ['id' => $entry['id']]);
+
DBA::delete('inbox-entry', ['id' => $entry['id']]);
+
+ $children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]);
+ while ($child = DBA::fetch($children)) {
+ self::deleteById($child['id']);
+ }
+ DBA::close($children);
}
/**
return false;
}
+ if (!self::isProcessable($id)) {
+ Logger::debug('Other queue entries need to be processed first.', ['id' => $id]);
+ return false;
+ }
+
if (!empty($entry['wid'])) {
$worker = DI::app()->getQueue();
$wid = $worker['id'] ?? 0;
}
}
- Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
+ Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id'], 'callstack' => System::callstack(20)]);
$activity = json_decode($entry['activity'], true);
$type = $entry['type'];
$activity['worker-id'] = $entry['wid'];
$activity['recursion-depth'] = 0;
+ if (empty($activity['thread-children-type'])) {
+ $activity['thread-children-type'] = $type;
+ }
+
$receivers = DBA::select('inbox-entry-receiver', ['uid'], ["`queue-id` = ? AND `uid` != ?", $entry['id'], 0]);
while ($receiver = DBA::fetch($receivers)) {
if (!in_array($receiver['uid'], $activity['receiver'])) {
}
DBA::close($receivers);
- if (!Receiver::routeActivities($activity, $type, $push, $fetch_parents)) {
+ if (!Receiver::routeActivities($activity, $type, $push, $fetch_parents, $activity['receiver'][0] ?? 0)) {
self::remove($activity);
}
DBA::close($entries);
}
+ public static function isProcessable(int $id): bool
+ {
+ $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
+ if (empty($entry)) {
+ return false;
+ }
+
+ if (!empty($entry['object-id']) && Post::exists(['uri' => $entry['object-id']])) {
+ // The object already exists, so processing can be done
+ return true;
+ }
+
+ if (!empty($entry['conversation'])) {
+ $conv_id = ItemURI::getIdByURI($entry['conversation'], false);
+ if (DBA::exists('post-thread', ['conversation-id' => $conv_id])) {
+ // We have got the conversation in the system, so the post can be processed
+ return true;
+ }
+ }
+
+ if (!empty($entry['object-id']) && !empty($entry['in-reply-to-id']) && ($entry['object-id'] != $entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
+ // This entry belongs to some other entry that should be processed first
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Clear old activities
*
{
// We delete all entries that aren't associated with a worker entry after seven days.
// The other entries are deleted when the worker deferred for too long.
- DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
+ $entries = DBA::select('inbox-entry', ['id'], ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
+ while ($entry = DBA::fetch($entries)) {
+ self::deleteById($entry['id']);
+ }
+ DBA::close($entries);
// Optimizing this table only last seconds
if (DI::config()->get('system', 'optimize_tables')) {