}
}
- if ($activity['target_id'] != $actor['featured']) {
- return null;
- }
-
- $id = Contact::getIdForURL($activity['actor']);
- if (empty($id)) {
- return null;
+ $parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]);
+ if (empty($parent['uri-id'])) {
+ if (self::fetchMissingActivity($activity['object_id'], $activity, '', Receiver::COMPLETION_AUTO)) {
+ $parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]);
+ }
}
- $parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id'], 'author-id' => $id]);
if (!empty($parent['uri-id'])) {
return $parent['uri-id'];
}
return '';
}
- if (!empty($object['actor'])) {
- $object_actor = $object['actor'];
- } elseif (!empty($object['attributedTo'])) {
- $object_actor = $object['attributedTo'];
- if (is_array($object_actor)) {
+ $signer = [];
+
+ if (!empty($object['attributedTo'])) {
+ $attributed_to = $object['attributedTo'];
+ if (is_array($attributed_to)) {
$compacted = JsonLD::compact($object);
- $object_actor = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id');
+ $attributed_to = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id');
}
+ $signer[] = $attributed_to;
+ }
+
+ if (!empty($object['actor'])) {
+ $object_actor = $object['actor'];
+ } elseif (!empty($attributed_to)) {
+ $object_actor = $attributed_to;
} else {
// Shouldn't happen
$object_actor = '';
}
- $signer = [$object_actor];
+ $signer[] = $object_actor;
if (!empty($child['author'])) {
$actor = $child['author'];
DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
}
+ /**
+ * Delete all entries that depend on the given worker id
+ *
+ * @param integer $wid
+ * @return void
+ */
+ public static function deleteByWorkerId(int $wid)
+ {
+ $entries = DBA::select('inbox-entry', ['id'], ['wid' => $wid]);
+ while ($entry = DBA::fetch($entries)) {
+ self::deleteById($entry['id']);
+ }
+ DBA::close($entries);
+ }
+
+ /**
+ * Delete recursively an entry and all their children
+ *
+ * @param integer $id
+ * @return void
+ */
+ private static function deleteById(int $id)
+ {
+ $entry = DBA::selectFirst('inbox-entry', ['id', 'object-id'], ['id' => $id]);
+ if (empty($entry)) {
+ return;
+ }
+
+ $children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]);
+ while ($child = DBA::fetch($children)) {
+ self::deleteById($child['id']);
+ }
+ DBA::close($children);
+ DBA::delete('inbox-entry', ['id' => $entry['id']]);
+ }
+
/**
* Set the worker id for the queue entry
*
$type = $entry['type'];
$push = $entry['push'];
- $activity['entry-id'] = $entry['id'];
- $activity['worker-id'] = $entry['wid'];
+ $activity['entry-id'] = $entry['id'];
+ $activity['worker-id'] = $entry['wid'];
+ $activity['recursion-depth'] = 0;
$receivers = DBA::select('inbox-entry-receiver', ['uid'], ['queue-id' => $entry['id']]);
while ($receiver = DBA::fetch($receivers)) {
*/
public static function processAll()
{
- $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]);
+ $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id'], ["`wid` IS NULL"], ['order' => ['id' => true]]);
while ($entry = DBA::fetch($entries)) {
+ // We don't need to process entries that depend on already existing entries.
+ if (!empty($entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) {
+ continue;
+ }
+ Logger::debug('Process leftover entry', $entry);
self::process($entry['id']);
}
}
}
}
- if (($type == 'as:Add') && is_array($activity['as:object']) && (count($activity['as:object']) == 1)) {
- $trust_source = false;
- }
-
// $trust_source is called by reference and is set to true if the content was retrieved successfully
$object_data = self::prepareObjectData($activity, $uid, $push, $trust_source);
if (empty($object_data)) {
$object_data['thread-children-type'] = $activity['thread-children-type'];
}
- if (!empty($activity['recursion-depth'])) {
- $object_data['recursion-depth'] = $activity['recursion-depth'];
- }
-
// Internal flag for posts that arrived via relay
if (!empty($activity['from-relay'])) {
$object_data['from-relay'] = $activity['from-relay'];
$object_data = Queue::add($object_data, $type, $uid, $http_signer, $push);
+ if (!empty($activity['recursion-depth'])) {
+ $object_data['recursion-depth'] = $activity['recursion-depth'];
+ }
+
if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) {
self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer);
}
if (DI::config()->get('system', 'optimize_tables')) {
Worker::add(PRIORITY_LOW, 'OptimizeTables');
}
-
- DI::config()->set('system', 'last_cron_daily', time());
+
+ // Process all unprocessed entries
+ Queue::processAll();
// Resubscribe to relay servers
Relay::reSubscribe();
+
+ DI::config()->set('system', 'last_cron_daily', time());
}
Logger::notice('end');
use Friendica\Core\Logger;
use Friendica\Core\Worker;
+use Friendica\DI;
use Friendica\Protocol\ActivityPub;
use Friendica\Protocol\ActivityPub\Queue;
use Friendica\Protocol\ActivityPub\Receiver;
/**
* Fetch missing activities
* @param string $url Contact URL
+ *
+ * @return void
*/
public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL)
{
$result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion);
if ($result) {
Logger::info('Successfully fetched missing activity', ['url' => $url]);
- Queue::processReplyByUri($url);
} elseif (!Worker::defer()) {
- // @todo perform recursive deletion of all entries
Logger::info('Activity could not be fetched', ['url' => $url]);
+
+ // recursively delete all entries that belong to this worker task
+ $queue = DI::app()->getQueue();
+ if (!empty($queue['id'])) {
+ Queue::deleteByWorkerId($queue['id']);
+ }
} else {
Logger::info('Fetching deferred', ['url' => $url]);
}