+ Logger::debug('Delete inbox-entry', ['id' => $entry['id']]);
+
+ DBA::delete('inbox-entry', ['id' => $entry['id']]);
+
+ $children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]);
+ while ($child = DBA::fetch($children)) {
+ self::deleteById($child['id']);
+ }
+ DBA::close($children);
+ }
+
+ /**
+ * Set the worker id for the queue entry
+ *
+ * @param int $entry_id
+ * @param int $wid
+ * @return void
+ */
+ public static function setWorkerId(int $entry_id, int $wid)
+ {
+ if (empty($entry_id) || empty($wid)) {
+ return;
+ }
+ DBA::update('inbox-entry', ['wid' => $wid], ['id' => $entry_id]);
+ }
+
+ /**
+ * Check if there is an assigned worker task
+ *
+ * @param int $wid
+ *
+ * @return bool
+ */
+ public static function hasWorker(int $wid): bool
+ {
+ if (empty($wid)) {
+ return false;
+ }
+ return DBA::exists('workerqueue', ['id' => $wid, 'done' => false]);
+ }
+
+ /**
+ * Process the activity with the given id
+ *
+ * @param integer $id
+ * @param bool $fetch_parents
+ *
+ * @return bool
+ */
+ public static function process(int $id, bool $fetch_parents = true): bool
+ {
+ $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
+ if (empty($entry)) {
+ return false;
+ }
+
+ if (!self::isProcessable($id)) {
+ Logger::debug('Other queue entries need to be processed first.', ['id' => $id]);
+ return false;
+ }
+
+ if (!empty($entry['wid'])) {
+ $worker = DI::app()->getQueue();
+ $wid = $worker['id'] ?? 0;
+ if ($entry['wid'] != $wid) {
+ $workerqueue = DBA::selectFirst('workerqueue', ['pid'], ['id' => $entry['wid'], 'done' => false]);
+ if (!empty($workerqueue['pid']) && posix_kill($workerqueue['pid'], 0)) {
+ Logger::notice('Entry is already processed via another process.', ['current' => $wid, 'processor' => $entry['wid']]);
+ return false;
+ }
+ }
+ }
+
+ Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id'], 'callstack' => System::callstack(20)]);