From dc16e6d471d1929fbc9c1c741feb51a2d2344d62 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 19 May 2022 19:24:21 +0000 Subject: [PATCH] The worker is split into several classes --- bin/daemon.php | 2 +- src/Core/Worker.php | 348 ++------------------------ src/Core/Worker/Cron.php | 192 ++++++++++++++ src/Core/Worker/Daemon.php | 137 ++++++++++ src/Core/Worker/IPC.php | 75 ++++++ src/Protocol/ActivityPub/Delivery.php | 161 ++++++++++++ src/Worker/APDelivery.php | 136 +--------- src/Worker/Cron.php | 3 - src/Worker/RequeuePosts.php | 47 ---- 9 files changed, 591 insertions(+), 510 deletions(-) create mode 100644 src/Core/Worker/Cron.php create mode 100644 src/Core/Worker/Daemon.php create mode 100644 src/Core/Worker/IPC.php create mode 100644 src/Protocol/ActivityPub/Delivery.php delete mode 100644 src/Worker/RequeuePosts.php diff --git a/bin/daemon.php b/bin/daemon.php index c7a16a9e8d..2173be1862 100755 --- a/bin/daemon.php +++ b/bin/daemon.php @@ -230,7 +230,7 @@ while (true) { } $timeout = ($seconds >= $wait_interval); - } while (!$timeout && !Worker::IPCJobsExists()); + } while (!$timeout && !Worker\IPC::JobsExists()); if ($timeout) { $do_cron = true; diff --git a/src/Core/Worker.php b/src/Core/Worker.php index e29cf765a4..bc52843e69 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -21,7 +21,6 @@ namespace Friendica\Core; -use Friendica\App\Mode; use Friendica\Core\Worker\Entity\Process; use Friendica\Database\DBA; use Friendica\DI; @@ -50,7 +49,6 @@ class Worker private static $lock_duration = 0; private static $last_update; private static $state; - private static $daemon_mode = null; /** @var Process */ private static $process; @@ -79,7 +77,7 @@ class Worker $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); if (time() > ($last_cleanup + 300)) { DI::config()->set('system', 'worker_last_cleaned', time()); - self::killStaleWorkers(); + Worker\Cron::killStaleWorkers(); } // Check if the system is ready @@ -89,7 +87,7 @@ class Worker // Now we start additional cron processes if we should do so if ($run_cron) { - self::runCron(); + Worker\Cron::run(); } $last_check = $starttime = time(); @@ -97,15 +95,13 @@ class Worker // We fetch the next queue entry that is about to be executed while ($r = self::workerProcess()) { - if (self::IPCJobsExists(getmypid())) { - self::IPCDeleteJobState(getmypid()); + if (Worker\IPC::JobsExists(getmypid())) { + Worker\IPC::DeleteJobState(getmypid()); } // Don't refetch when a worker fetches tasks for multiple workers $refetched = DI::config()->get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { - $entry = self::checkPriority($entry); - // The work will be done if (!self::execute($entry)) { Logger::notice('Process execution failed, quitting.'); @@ -150,8 +146,8 @@ class Worker if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { Logger::info('Process lifetime reached, respawning.'); self::unclaimProcess($process); - if (self::isDaemonMode()) { - self::IPCSetJobState(true); + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(true); } else { self::spawnWorker(); } @@ -160,30 +156,12 @@ class Worker } // Cleaning up. Possibly not needed, but it doesn't harm anything. - if (self::isDaemonMode()) { - self::IPCSetJobState(false); + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(false); } Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); } - /** - * Check and fix the priority of a worker task - * @param array $entry - * @return array - */ - private static function checkPriority(array $entry) - { - $entry['priority'] = (int)$entry['priority']; - - if (!in_array($entry['priority'], PRIORITIES)) { - Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]); - DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]); - $entry['priority'] = PRIORITY_MEDIUM; - } - - return $entry; - } - /** * Checks if the system is ready. * @@ -642,85 +620,6 @@ class Worker return true; } - /** - * fix the queue entry if the worker process died - * - * @return void - * @throws \Exception - */ - private static function killStaleWorkers() - { - $stamp = (float)microtime(true); - $entries = DBA::select( - 'workerqueue', - ['id', 'pid', 'executed', 'priority', 'command', 'parameter'], - ['NOT `done` AND `pid` != 0'], - ['order' => ['priority', 'retrial', 'created']] - ); - self::$db_duration += (microtime(true) - $stamp); - - while ($entry = DBA::fetch($entries)) { - $entry = self::checkPriority($entry); - - if (!posix_kill($entry["pid"], 0)) { - $stamp = (float)microtime(true); - DBA::update( - 'workerqueue', - ['executed' => DBA::NULL_DATETIME, 'pid' => 0], - ['id' => $entry["id"]] - ); - self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); - } else { - // Kill long running processes - - // Define the maximum durations - $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720]; - $max_duration = $max_duration_defaults[$entry['priority']]; - - $argv = json_decode($entry['parameter'], true); - if (!empty($entry['command'])) { - $command = $entry['command']; - } elseif (!empty($argv)) { - $command = array_shift($argv); - } else { - return; - } - - $command = basename($command); - - // How long is the process already running? - $duration = (time() - strtotime($entry["executed"])) / 60; - if ($duration > $max_duration) { - Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]); - posix_kill($entry["pid"], SIGTERM); - - // We killed the stale process. - // To avoid a blocking situation we reschedule the process at the beginning of the queue. - // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL) - $new_priority = $entry['priority']; - if ($entry['priority'] == PRIORITY_HIGH) { - $new_priority = PRIORITY_MEDIUM; - } elseif ($entry['priority'] == PRIORITY_MEDIUM) { - $new_priority = PRIORITY_LOW; - } elseif ($entry['priority'] != PRIORITY_CRITICAL) { - $new_priority = PRIORITY_NEGLIGIBLE; - } - $stamp = (float)microtime(true); - DBA::update( - 'workerqueue', - ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0], - ['id' => $entry["id"]] - ); - self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); - } else { - Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]); - } - } - } - DBA::close($entries); - } /** * Checks if the number of active workers exceeds the given limits @@ -830,8 +729,8 @@ class Worker // Are there fewer workers running as possible? Then fork a new one. if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) { Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]); - if (self::isDaemonMode()) { - self::IPCSetJobState(true); + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(true); } else { self::spawnWorker(); } @@ -839,8 +738,8 @@ class Worker } // if there are too much worker, we don't spawn a new one. - if (self::isDaemonMode() && ($active > $queues)) { - self::IPCSetJobState(false); + if (Worker\Daemon::isMode() && ($active > $queues)) { + Worker\IPC::SetJobState(false); } return $active > $queues; @@ -1131,52 +1030,6 @@ class Worker self::$db_duration_write += (microtime(true) - $stamp); } - /** - * Runs the cron processes - * - * @return void - * @throws \Friendica\Network\HTTPException\InternalServerErrorException - */ - private static function runCron() - { - Logger::info('Add cron entries'); - - // Check for spooled items - self::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost'); - - // Run the cron job that calls all other jobs - self::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron'); - - // Cleaning dead processes - self::killStaleWorkers(); - - // Remove old entries from the workerqueue - self::cleanWorkerQueue(); - } - - /** - * Remove old entries from the workerqueue - * - * @return void - */ - private static function cleanWorkerQueue() - { - DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]); - - // Optimizing this table only last seconds - if (DI::config()->get('system', 'optimize_tables')) { - // We are acquiring the two locks from the worker to avoid locking problems - if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) { - if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) { - DBA::e("OPTIMIZE TABLE `workerqueue`"); - DBA::e("OPTIMIZE TABLE `process`"); - DI::lock()->release(Worker::LOCK_WORKER); - } - DI::lock()->release(Worker::LOCK_PROCESS); - } - } - } - /** * Fork a child process * @@ -1202,11 +1055,11 @@ class Worker // The parent process continues here DBA::connect(); - self::IPCSetJobState(true, $pid); + Worker\IPC::SetJobState(true, $pid); Logger::info('Spawned new worker', ['pid' => $pid]); $cycles = 0; - while (self::IPCJobsExists($pid) && (++$cycles < 100)) { + while (Worker\IPC::JobsExists($pid) && (++$cycles < 100)) { usleep(10000); } @@ -1221,7 +1074,7 @@ class Worker $process = DI::process()->create(getmypid(), basename(__FILE__)); $cycles = 0; - while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) { + while (!Worker\IPC::JobsExists($process->pid) && (++$cycles < 100)) { usleep(10000); } @@ -1231,7 +1084,7 @@ class Worker self::unclaimProcess($process); - self::IPCSetJobState(false, $process->pid); + Worker\IPC::SetJobState(false, $process->pid); DI::process()->delete($process); Logger::info('Worker ended', ['pid' => $process->pid]); exit(); @@ -1246,13 +1099,13 @@ class Worker */ public static function spawnWorker($do_cron = false) { - if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) { + if (Worker\Daemon::isMode() && DI::config()->get('system', 'worker_fork')) { self::forkProcess($do_cron); } else { DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]); } - if (self::isDaemonMode()) { - self::IPCSetJobState(false); + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(false); } } @@ -1343,11 +1196,11 @@ class Worker } // Set the IPC flag to ensure an immediate process execution via daemon - if (self::isDaemonMode()) { - self::IPCSetJobState(true); + if (Worker\Daemon::isMode()) { + Worker\IPC::SetJobState(true); } - self::checkDaemonState(); + Worker\Daemon::checkState(); // Should we quit and wait for the worker to be called as a cronjob? if ($dont_fork) { @@ -1368,7 +1221,7 @@ class Worker } // Quit on daemon mode - if (self::isDaemonMode()) { + if (Worker\Daemon::isMode()) { return $added; } @@ -1423,8 +1276,6 @@ class Worker return false; } - $queue = self::checkPriority($queue); - $id = $queue['id']; $priority = $queue['priority']; @@ -1460,159 +1311,6 @@ class Worker return true; } - /** - * Set the flag if some job is waiting - * - * @param boolean $jobs Is there a waiting job? - * @param int $key Key number - * @throws \Exception - */ - public static function IPCSetJobState(bool $jobs, int $key = 0) - { - $stamp = (float)microtime(true); - DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]); - self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); - } - - /** - * Delete a key entry - * - * @param int $key Key number - * @throws \Exception - */ - public static function IPCDeleteJobState(int $key) - { - $stamp = (float)microtime(true); - DBA::delete('worker-ipc', ['key' => $key]); - self::$db_duration += (microtime(true) - $stamp); - self::$db_duration_write += (microtime(true) - $stamp); - } - - /** - * Checks if some worker job waits to be executed - * - * @param int $key Key number - * @return bool - * @throws \Exception - */ - public static function IPCJobsExists(int $key = 0) - { - $stamp = (float)microtime(true); - $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]); - self::$db_duration += (microtime(true) - $stamp); - - // When we don't have a row, no job is running - if (!DBA::isResult($row)) { - return false; - } - - return (bool)$row['jobs']; - } - - /** - * Checks if the worker is running in the daemon mode. - * - * @return boolean - */ - public static function isDaemonMode() - { - if (!is_null(self::$daemon_mode)) { - return self::$daemon_mode; - } - - if (DI::mode()->getExecutor() == Mode::DAEMON) { - return true; - } - - $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true); - if ($daemon_mode) { - return $daemon_mode; - } - - if (!function_exists('pcntl_fork')) { - self::$daemon_mode = false; - return false; - } - - $pidfile = DI::config()->get('system', 'pidfile'); - if (empty($pidfile)) { - // No pid file, no daemon - self::$daemon_mode = false; - return false; - } - - if (!is_readable($pidfile)) { - // No pid file. We assume that the daemon had been intentionally stopped. - self::$daemon_mode = false; - return false; - } - - $pid = intval(file_get_contents($pidfile)); - $running = posix_kill($pid, 0); - - self::$daemon_mode = $running; - return $running; - } - - /** - * Test if the daemon is running. If not, it will be started - * - * @return void - */ - private static function checkDaemonState() - { - if (!DI::config()->get('system', 'daemon_watchdog', false)) { - return; - } - - if (!DI::mode()->isNormal()) { - return; - } - - // Check every minute if the daemon is running - if (DI::config()->get('system', 'last_daemon_check', 0) + 60 > time()) { - return; - } - - DI::config()->set('system', 'last_daemon_check', time()); - - $pidfile = DI::config()->get('system', 'pidfile'); - if (empty($pidfile)) { - // No pid file, no daemon - return; - } - - if (!is_readable($pidfile)) { - // No pid file. We assume that the daemon had been intentionally stopped. - return; - } - - $pid = intval(file_get_contents($pidfile)); - if (posix_kill($pid, 0)) { - Logger::info('Daemon process is running', ['pid' => $pid]); - return; - } - - Logger::warning('Daemon process is not running', ['pid' => $pid]); - - self::spawnDaemon(); - } - - /** - * Spawn a new daemon process - * - * @return void - */ - private static function spawnDaemon() - { - Logger::notice('Starting new daemon process'); - $command = 'bin/daemon.php'; - $a = DI::app(); - DI::system()->run($command, ['start']); - Logger::notice('New daemon process started'); - } - /** * Check if the system is inside the defined maintenance window * diff --git a/src/Core/Worker/Cron.php b/src/Core/Worker/Cron.php new file mode 100644 index 0000000000..b07c472750 --- /dev/null +++ b/src/Core/Worker/Cron.php @@ -0,0 +1,192 @@ +. + * + */ + +namespace Friendica\Core\Worker; + +use Friendica\Core\Logger; +use Friendica\Core\Worker; +use Friendica\Database\DBA; +use Friendica\DI; +use Friendica\Model\Post; +use Friendica\Protocol\ActivityPub; +use Friendica\Util\DateTimeFormat; + +/** + * Contains the class for jobs that are executed in an interval + */ +class Cron +{ + /** + * Runs the cron processes + * + * @return void + * @throws \Friendica\Network\HTTPException\InternalServerErrorException + */ + public static function run() + { + Logger::info('Add cron entries'); + + // Check for spooled items + Worker::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost'); + + // Run the cron job that calls all other jobs + Worker::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron'); + + // Cleaning dead processes + self::killStaleWorkers(); + + // Remove old entries from the workerqueue + self::cleanWorkerQueue(); + + // Directly deliver or requeue posts + self::deliverPosts(); + } + + /** + * fix the queue entry if the worker process died + * + * @return void + * @throws \Exception + */ + public static function killStaleWorkers() + { + $stamp = (float)microtime(true); + $entries = DBA::select( + 'workerqueue', + ['id', 'pid', 'executed', 'priority', 'command', 'parameter'], + ['NOT `done` AND `pid` != 0'], + ['order' => ['priority', 'retrial', 'created']] + ); + + while ($entry = DBA::fetch($entries)) { + if (!posix_kill($entry["pid"], 0)) { + $stamp = (float)microtime(true); + DBA::update( + 'workerqueue', + ['executed' => DBA::NULL_DATETIME, 'pid' => 0], + ['id' => $entry["id"]] + ); + } else { + // Kill long running processes + + // Define the maximum durations + $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720]; + $max_duration = $max_duration_defaults[$entry['priority']]; + + $argv = json_decode($entry['parameter'], true); + if (!empty($entry['command'])) { + $command = $entry['command']; + } elseif (!empty($argv)) { + $command = array_shift($argv); + } else { + return; + } + + $command = basename($command); + + // How long is the process already running? + $duration = (time() - strtotime($entry["executed"])) / 60; + if ($duration > $max_duration) { + Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]); + posix_kill($entry["pid"], SIGTERM); + + // We killed the stale process. + // To avoid a blocking situation we reschedule the process at the beginning of the queue. + // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL) + $new_priority = $entry['priority']; + if ($entry['priority'] == PRIORITY_HIGH) { + $new_priority = PRIORITY_MEDIUM; + } elseif ($entry['priority'] == PRIORITY_MEDIUM) { + $new_priority = PRIORITY_LOW; + } elseif ($entry['priority'] != PRIORITY_CRITICAL) { + $new_priority = PRIORITY_NEGLIGIBLE; + } + $stamp = (float)microtime(true); + DBA::update( + 'workerqueue', + ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0], + ['id' => $entry["id"]] + ); + } else { + Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]); + } + } + } + DBA::close($entries); + } + + /** + * Remove old entries from the workerqueue + * + * @return void + */ + private static function cleanWorkerQueue() + { + DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]); + + // Optimizing this table only last seconds + if (DI::config()->get('system', 'optimize_tables')) { + // We are acquiring the two locks from the worker to avoid locking problems + if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) { + if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) { + DBA::e("OPTIMIZE TABLE `workerqueue`"); + DBA::e("OPTIMIZE TABLE `process`"); + DI::lock()->release(Worker::LOCK_WORKER); + } + DI::lock()->release(Worker::LOCK_PROCESS); + } + } + } + + /** + * Directly deliver AP messages or requeue them. + * + * This function is placed here as a safeguard. Even when the worker queue is completely blocked, messages will be delivered. + */ + private static function deliverPosts() + { + $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox`, MAX(`failed`) AS `failed` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox`"); + while ($delivery = DBA::fetch($deliveries)) { + if ($delivery['failed'] == 0) { + $result = ActivityPub\Delivery::deliver($delivery['inbox']); + Logger::info('Drectly deliver inbox', ['inbox' => $delivery['inbox'], 'result' => $result['success']]); + continue; + } elseif ($delivery['failed'] < 3) { + $priority = PRIORITY_HIGH; + } elseif ($delivery['failed'] < 6) { + $priority = PRIORITY_MEDIUM; + } elseif ($delivery['failed'] < 8) { + $priority = PRIORITY_LOW; + } { + $priority = PRIORITY_NEGLIGIBLE; + } + + if ($delivery['failed'] >= DI::config()->get('system', 'worker_defer_limit')) { + Logger::info('Removing failed deliveries', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed']]); + Post\Delivery::removeFailed($delivery['inbox']); + } + + if (Worker::add($priority, 'APDelivery', '', 0, $delivery['inbox'], 0)) { + Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox'], 'failed' => $delivery['failed'], 'priority' => $priority]); + } + } + } +} diff --git a/src/Core/Worker/Daemon.php b/src/Core/Worker/Daemon.php new file mode 100644 index 0000000000..afc6cda4ec --- /dev/null +++ b/src/Core/Worker/Daemon.php @@ -0,0 +1,137 @@ +. + * + */ + +namespace Friendica\Core\Worker; + +use Friendica\App\Mode; +use Friendica\Core\Logger; +use Friendica\DI; + +/** + * Contains the class for the worker background job processing + */ +class Daemon +{ + private static $daemon_mode = null; + + /** + * Checks if the worker is running in the daemon mode. + * + * @return boolean + */ + public static function isMode() + { + if (!is_null(self::$daemon_mode)) { + return self::$daemon_mode; + } + + if (DI::mode()->getExecutor() == Mode::DAEMON) { + return true; + } + + $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true); + if ($daemon_mode) { + return $daemon_mode; + } + + if (!function_exists('pcntl_fork')) { + self::$daemon_mode = false; + return false; + } + + $pidfile = DI::config()->get('system', 'pidfile'); + if (empty($pidfile)) { + // No pid file, no daemon + self::$daemon_mode = false; + return false; + } + + if (!is_readable($pidfile)) { + // No pid file. We assume that the daemon had been intentionally stopped. + self::$daemon_mode = false; + return false; + } + + $pid = intval(file_get_contents($pidfile)); + $running = posix_kill($pid, 0); + + self::$daemon_mode = $running; + return $running; + } + + /** + * Test if the daemon is running. If not, it will be started + * + * @return void + */ + public static function checkState() + { + if (!DI::config()->get('system', 'daemon_watchdog', false)) { + return; + } + + if (!DI::mode()->isNormal()) { + return; + } + + // Check every minute if the daemon is running + if (DI::config()->get('system', 'last_daemon_check', 0) + 60 > time()) { + return; + } + + DI::config()->set('system', 'last_daemon_check', time()); + + $pidfile = DI::config()->get('system', 'pidfile'); + if (empty($pidfile)) { + // No pid file, no daemon + return; + } + + if (!is_readable($pidfile)) { + // No pid file. We assume that the daemon had been intentionally stopped. + return; + } + + $pid = intval(file_get_contents($pidfile)); + if (posix_kill($pid, 0)) { + Logger::info('Daemon process is running', ['pid' => $pid]); + return; + } + + Logger::warning('Daemon process is not running', ['pid' => $pid]); + + self::spawn(); + } + + /** + * Spawn a new daemon process + * + * @return void + */ + private static function spawn() + { + Logger::notice('Starting new daemon process'); + $command = 'bin/daemon.php'; + $a = DI::app(); + DI::system()->run($command, ['start']); + Logger::notice('New daemon process started'); + } +} diff --git a/src/Core/Worker/IPC.php b/src/Core/Worker/IPC.php new file mode 100644 index 0000000000..a10cf1cd0c --- /dev/null +++ b/src/Core/Worker/IPC.php @@ -0,0 +1,75 @@ +. + * + */ + +namespace Friendica\Core\Worker; + +use Friendica\Database\DBA; + +/** + * Contains the class for the inter process communication + */ +class IPC +{ + /** + * Set the flag if some job is waiting + * + * @param boolean $jobs Is there a waiting job? + * @param int $key Key number + * @throws \Exception + */ + public static function SetJobState(bool $jobs, int $key = 0) + { + $stamp = (float)microtime(true); + DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]); + } + + /** + * Delete a key entry + * + * @param int $key Key number + * @throws \Exception + */ + public static function DeleteJobState(int $key) + { + $stamp = (float)microtime(true); + DBA::delete('worker-ipc', ['key' => $key]); + } + + /** + * Checks if some worker job waits to be executed + * + * @param int $key Key number + * @return bool + * @throws \Exception + */ + public static function JobsExists(int $key = 0) + { + $stamp = (float)microtime(true); + $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]); + + // When we don't have a row, no job is running + if (!DBA::isResult($row)) { + return false; + } + + return (bool)$row['jobs']; + } +} diff --git a/src/Protocol/ActivityPub/Delivery.php b/src/Protocol/ActivityPub/Delivery.php new file mode 100644 index 0000000000..29fe21dd06 --- /dev/null +++ b/src/Protocol/ActivityPub/Delivery.php @@ -0,0 +1,161 @@ +. + * + */ + +namespace Friendica\Protocol\ActivityPub; + +use Friendica\Core\Logger; +use Friendica\DI; +use Friendica\Model\Contact; +use Friendica\Model\GServer; +use Friendica\Model\Post; +use Friendica\Protocol\ActivityPub; +use Friendica\Util\HTTPSignature; +use Friendica\Worker\Delivery as WorkerDelivery; + +class Delivery +{ + public static function deliver(string $inbox):array + { + $uri_ids = []; + $posts = Post\Delivery::selectForInbox($inbox); + $serverfail = false; + + foreach ($posts as $post) { + if (!$serverfail) { + $result = self::deliverToInbox($post['command'], 0, $inbox, $post['uid'], $post['receivers'], $post['uri-id']); + + if ($result['serverfailure']) { + // In a timeout situation we assume that every delivery to that inbox will time out. + // So we set the flag and try all deliveries at a later time. + Logger::info('Inbox delivery has a server failure', ['inbox' => $inbox]); + $serverfail = true; + } + } + + if ($serverfail || !$result['success']) { + $uri_ids[] = $post['uri-id']; + } + } + + Logger::debug('Inbox delivery done', ['inbox' => $inbox, 'posts' => count($posts), 'failed' => count($uri_ids), 'serverfailure' => $serverfail]); + return ['success' => empty($uri_ids), 'uri_ids' => $uri_ids]; + } + + public static function deliverToInbox(string $cmd, int $item_id, string $inbox, int $uid, array $receivers, int $uri_id): array + { + if (empty($item_id) && !empty($uri_id) && !empty($uid)) { + $item = Post::selectFirst(['id', 'parent', 'origin'], ['uri-id' => $uri_id, 'uid' => [$uid, 0]], ['order' => ['uid' => true]]); + if (empty($item['id'])) { + Logger::notice('Item not found, removing delivery', ['uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]); + Post\Delivery::remove($uri_id, $inbox); + return true; + } else { + $item_id = $item['id']; + } + } + + $success = true; + $serverfail = false; + + if ($cmd == WorkerDelivery::MAIL) { + $data = ActivityPub\Transmitter::createActivityFromMail($item_id); + if (!empty($data)) { + $success = HTTPSignature::transmit($data, $inbox, $uid); + } + } elseif ($cmd == WorkerDelivery::SUGGESTION) { + $success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $item_id); + } elseif ($cmd == WorkerDelivery::RELOCATION) { + // @todo Implementation pending + } elseif ($cmd == WorkerDelivery::POKE) { + // Implementation not planned + } elseif ($cmd == WorkerDelivery::REMOVAL) { + $success = ActivityPub\Transmitter::sendProfileDeletion($uid, $inbox); + } elseif ($cmd == WorkerDelivery::PROFILEUPDATE) { + $success = ActivityPub\Transmitter::sendProfileUpdate($uid, $inbox); + } else { + $data = ActivityPub\Transmitter::createCachedActivityFromItem($item_id); + if (!empty($data)) { + $timestamp = microtime(true); + $response = HTTPSignature::post($data, $inbox, $uid); + $runtime = microtime(true) - $timestamp; + $success = $response->isSuccess(); + $serverfail = $response->isTimeout(); + if (!$success) { + if (!$serverfail && ($response->getReturnCode() >= 500) && ($response->getReturnCode() <= 599)) { + $serverfail = true; + } + + $xrd_timeout = DI::config()->get('system', 'xrd_timeout'); + if (!$serverfail && $xrd_timeout && ($runtime > $xrd_timeout)) { + $serverfail = true; + } + $curl_timeout = DI::config()->get('system', 'curl_timeout'); + if (!$serverfail && $curl_timeout && ($runtime > $curl_timeout)) { + $serverfail = true; + } + + Logger::info('Delivery failed', ['retcode' => $response->getReturnCode(), 'serverfailure' => $serverfail, 'runtime' => round($runtime, 3), 'uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox]); + } + if ($uri_id) { + if ($success) { + Post\Delivery::remove($uri_id, $inbox); + } else { + Post\Delivery::incrementFailed($uri_id, $inbox); + } + } + } + } + + self::setSuccess($receivers, $success); + + Logger::debug('Delivered', ['uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox, 'success' => $success]); + + if ($success && in_array($cmd, [WorkerDelivery::POST])) { + Post\DeliveryData::incrementQueueDone($uri_id, Post\DeliveryData::ACTIVITYPUB); + } + + return ['success' => $success, 'serverfailure' => $serverfail]; + } + + private static function setSuccess(array $receivers, bool $success) + { + $gsid = null; + + foreach ($receivers as $receiver) { + $contact = Contact::getById($receiver); + if (empty($contact)) { + continue; + } + + $gsid = $gsid ?: $contact['gsid']; + + if ($success) { + Contact::unmarkForArchival($contact); + } else { + Contact::markForArchival($contact); + } + } + + if (!empty($gsid)) { + GServer::setProtocol($gsid, Post\DeliveryData::ACTIVITYPUB); + } + } +} diff --git a/src/Worker/APDelivery.php b/src/Worker/APDelivery.php index 051bde0a0c..4703b8ca16 100644 --- a/src/Worker/APDelivery.php +++ b/src/Worker/APDelivery.php @@ -23,13 +23,8 @@ namespace Friendica\Worker; use Friendica\Core\Logger; use Friendica\Core\Worker; -use Friendica\DI; -use Friendica\Model\Contact; -use Friendica\Model\GServer; use Friendica\Model\Post; use Friendica\Protocol\ActivityPub; -use Friendica\Util\HTTPSignature; - class APDelivery { /** @@ -69,11 +64,11 @@ class APDelivery Logger::debug('Invoked', ['cmd' => $cmd, 'inbox' => $inbox, 'id' => $item_id, 'uri-id' => $uri_id, 'uid' => $uid]); if (empty($uri_id)) { - $result = self::deliver($inbox); + $result = ActivityPub\Delivery::deliver($inbox); $success = $result['success']; $uri_ids = $result['uri_ids']; } else { - $result = self::deliverToInbox($cmd, $item_id, $inbox, $uid, $receivers, $uri_id); + $result = ActivityPub\Delivery::deliverToInbox($cmd, $item_id, $inbox, $uid, $receivers, $uri_id); $success = $result['success']; $uri_ids = [$uri_id]; } @@ -85,131 +80,4 @@ class APDelivery } } } - - private static function deliver(string $inbox):array - { - $uri_ids = []; - $posts = Post\Delivery::selectForInbox($inbox); - $serverfail = false; - - foreach ($posts as $post) { - if (!$serverfail) { - $result = self::deliverToInbox($post['command'], 0, $inbox, $post['uid'], $post['receivers'], $post['uri-id']); - - if ($result['serverfailure']) { - // In a timeout situation we assume that every delivery to that inbox will time out. - // So we set the flag and try all deliveries at a later time. - Logger::info('Inbox delivery has a server failure', ['inbox' => $inbox]); - $serverfail = true; - } - } - - if ($serverfail || !$result['success']) { - $uri_ids[] = $post['uri-id']; - } - } - - Logger::debug('Inbox delivery done', ['inbox' => $inbox, 'posts' => count($posts), 'failed' => count($uri_ids), 'serverfailure' => $serverfail]); - return ['success' => empty($uri_ids), 'uri_ids' => $uri_ids]; - } - - private static function deliverToInbox(string $cmd, int $item_id, string $inbox, int $uid, array $receivers, int $uri_id): array - { - if (empty($item_id) && !empty($uri_id) && !empty($uid)) { - $item = Post::selectFirst(['id', 'parent', 'origin'], ['uri-id' => $uri_id, 'uid' => [$uid, 0]], ['order' => ['uid' => true]]); - if (empty($item['id'])) { - Logger::notice('Item not found, removing delivery', ['uri-id' => $uri_id, 'uid' => $uid, 'cmd' => $cmd, 'inbox' => $inbox]); - Post\Delivery::remove($uri_id, $inbox); - return true; - } else { - $item_id = $item['id']; - } - } - - $success = true; - $serverfail = false; - - if ($cmd == Delivery::MAIL) { - $data = ActivityPub\Transmitter::createActivityFromMail($item_id); - if (!empty($data)) { - $success = HTTPSignature::transmit($data, $inbox, $uid); - } - } elseif ($cmd == Delivery::SUGGESTION) { - $success = ActivityPub\Transmitter::sendContactSuggestion($uid, $inbox, $item_id); - } elseif ($cmd == Delivery::RELOCATION) { - // @todo Implementation pending - } elseif ($cmd == Delivery::POKE) { - // Implementation not planned - } elseif ($cmd == Delivery::REMOVAL) { - $success = ActivityPub\Transmitter::sendProfileDeletion($uid, $inbox); - } elseif ($cmd == Delivery::PROFILEUPDATE) { - $success = ActivityPub\Transmitter::sendProfileUpdate($uid, $inbox); - } else { - $data = ActivityPub\Transmitter::createCachedActivityFromItem($item_id); - if (!empty($data)) { - $timestamp = microtime(true); - $response = HTTPSignature::post($data, $inbox, $uid); - $runtime = microtime(true) - $timestamp; - $success = $response->isSuccess(); - $serverfail = $response->isTimeout(); - if (!$success) { - if (!$serverfail && ($response->getReturnCode() >= 500) && ($response->getReturnCode() <= 599)) { - $serverfail = true; - } - - $xrd_timeout = DI::config()->get('system', 'xrd_timeout'); - if (!$serverfail && $xrd_timeout && ($runtime > $xrd_timeout)) { - $serverfail = true; - } - $curl_timeout = DI::config()->get('system', 'curl_timeout'); - if (!$serverfail && $curl_timeout && ($runtime > $curl_timeout)) { - $serverfail = true; - } - - Logger::info('Delivery failed', ['retcode' => $response->getReturnCode(), 'serverfailure' => $serverfail, 'runtime' => round($runtime, 3), 'uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox]); - } - if ($uri_id) { - if ($success) { - Post\Delivery::remove($uri_id, $inbox); - } else { - Post\Delivery::incrementFailed($uri_id, $inbox); - } - } - } - } - - self::setSuccess($receivers, $success); - - Logger::debug('Delivered', ['uri-id' => $uri_id, 'uid' => $uid, 'item_id' => $item_id, 'cmd' => $cmd, 'inbox' => $inbox, 'success' => $success]); - - if ($success && in_array($cmd, [Delivery::POST])) { - Post\DeliveryData::incrementQueueDone($uri_id, Post\DeliveryData::ACTIVITYPUB); - } - - return ['success' => $success, 'serverfailure' => $serverfail]; - } - - private static function setSuccess(array $receivers, bool $success) - { - $gsid = null; - - foreach ($receivers as $receiver) { - $contact = Contact::getById($receiver); - if (empty($contact)) { - continue; - } - - $gsid = $gsid ?: $contact['gsid']; - - if ($success) { - Contact::unmarkForArchival($contact); - } else { - Contact::markForArchival($contact); - } - } - - if (!empty($gsid)) { - GServer::setProtocol($gsid, Post\DeliveryData::ACTIVITYPUB); - } - } } diff --git a/src/Worker/Cron.php b/src/Worker/Cron.php index e39ba24aae..01c54fed18 100644 --- a/src/Worker/Cron.php +++ b/src/Worker/Cron.php @@ -95,9 +95,6 @@ class Cron // Clear cache entries Worker::add(PRIORITY_LOW, 'ClearCache'); - // Requeue posts from the post delivery entries - Worker::add(PRIORITY_MEDIUM, 'RequeuePosts'); - DI::config()->set('system', 'last_cron_hourly', time()); } diff --git a/src/Worker/RequeuePosts.php b/src/Worker/RequeuePosts.php deleted file mode 100644 index 568595aafd..0000000000 --- a/src/Worker/RequeuePosts.php +++ /dev/null @@ -1,47 +0,0 @@ -. - * - */ - -namespace Friendica\Worker; - -use Friendica\Core\Logger; -use Friendica\Core\Worker; -use Friendica\Database\DBA; -use Friendica\Model\Post; - -/** - * Requeue posts that are stuck in the post-delivery table without a matching delivery job. - * This should not happen in regular situations, this is a precaution. - */ -class RequeuePosts -{ - public static function execute() - { - $deliveries = DBA::p("SELECT `item-uri`.`uri` AS `inbox` FROM `post-delivery` INNER JOIN `item-uri` ON `item-uri`.`id` = `post-delivery`.`inbox-id` GROUP BY `inbox`"); - while ($delivery = DBA::fetch($deliveries)) { - Post\Delivery::removeFailed($delivery['inbox']); - - if (Worker::add(PRIORITY_HIGH, 'APDelivery', '', 0, $delivery['inbox'], 0)) { - Logger::info('Missing APDelivery worker added for inbox', ['inbox' => $delivery['inbox']]); - } - } - DBA::close($deliveries); - } -} -- 2.39.5