X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=90bce0a88e5807df6fa0f9eed52829c6f4440fef;hb=4e4eab7548c6c7bb7f096beb39419fef276af500;hp=47da04b35cfe9b8d2a39775e4022ecf130deb340;hpb=a6d060b0e4c7ce046211619eb6ae20492998af6d;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 47da04b35c..90bce0a88e 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1,6 +1,6 @@ get('system', 'worker_last_cleaned', 0); + $last_cleanup = DI::keyValue()->get('worker_last_cleaned') ?? 0; if (time() > ($last_cleanup + 300)) { - DI::config()->set('system', 'worker_last_cleaned', time()); + DI::keyValue()->set( 'worker_last_cleaned', time()); Worker\Cron::killStaleWorkers(); } @@ -331,7 +331,7 @@ class Worker $mypid = getmypid(); // Quit when in maintenance - if (DI::config()->get('system', 'maintenance', false, true)) { + if (DI::config()->get('system', 'maintenance', false)) { Logger::notice('Maintenance mode - quit process', ['pid' => $mypid]); return false; } @@ -388,7 +388,7 @@ class Worker $stamp = (float)microtime(true); $condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()]; if (DBA::update('workerqueue', ['done' => true], $condition)) { - DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow()); + DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); @@ -429,7 +429,7 @@ class Worker $stamp = (float)microtime(true); if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) { - DI::config()->set('system', 'last_worker_execution', DateTimeFormat::utcNow()); + DI::keyValue()->set('last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); @@ -462,7 +462,7 @@ class Worker return false; } - $load = System::getLoadAvg(); + $load = System::getLoadAvg($processes_cooldown != 0); if (empty($load)) { return false; } @@ -508,7 +508,7 @@ class Worker $sleeping = false; - while ($load = System::getLoadAvg()) { + while ($load = System::getLoadAvg($processes_cooldown != 0)) { if (($load_cooldown > 0) && ($load['average1'] > $load_cooldown)) { if (!$sleeping) { Logger::notice('Load induced pre execution cooldown.', ['max' => $load_cooldown, 'load' => $load, 'called-by' => System::callstack(1)]); @@ -574,7 +574,7 @@ class Worker // No need to defer a worker queue entry if the arguments are invalid Logger::notice('Wrong worker arguments', ['class' => $funcname, 'argv' => $argv, 'queue' => $queue, 'message' => $e->getMessage()]); } catch (\Throwable $e) { - Logger::error('Uncaught exception in worker execution', ['class' => get_class($e), 'message' => $e->getMessage(), 'code' => $e->getCode(), 'file' => $e->getFile() . ':' . $e->getLine(), 'trace' => $e->getTraceAsString()]); + Logger::error('Uncaught exception in worker execution', ['class' => get_class($e), 'message' => $e->getMessage(), 'code' => $e->getCode(), 'file' => $e->getFile() . ':' . $e->getLine(), 'trace' => $e->getTraceAsString(), 'previous' => $e->getPrevious()]); Worker::defer(); } } else { @@ -1264,7 +1264,7 @@ class Worker $command = array_shift($args); $parameters = json_encode($args); - $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]); + $queue = DBA::selectFirst('workerqueue', ['id', 'priority'], ['command' => $command, 'parameter' => $parameters, 'done' => false]); $added = 0; if (!is_int($priority) || !in_array($priority, self::PRIORITIES)) { @@ -1277,14 +1277,17 @@ class Worker return 0; } - if (!$found) { + if (empty($queue)) { if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created, 'priority' => $priority, 'next_try' => $delayed])) { return 0; } $added = DBA::lastInsertId(); } elseif ($force_priority) { - DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]); + $ret = DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]); + if ($ret && ($priority != $queue['priority'])) { + $added = $queue['id']; + } } // Set the IPC flag to ensure an immediate process execution via daemon @@ -1312,8 +1315,8 @@ class Worker return $added; } - // Quit on daemon mode - if (Worker\Daemon::isMode()) { + // Quit on daemon mode, except the priority is critical (like for db updates) + if (Worker\Daemon::isMode() && $priority !== self::PRIORITY_CRITICAL) { return $added; } @@ -1422,7 +1425,7 @@ class Worker $duration = max($start, $end) - min($start, $end); // Quit when the last cron execution had been after the previous window - $last_cron = DI::config()->get('system', 'last_cron_daily'); + $last_cron = DI::keyValue()->get('last_cron_daily'); if ($last_cron + $duration > time()) { Logger::info('The Daily cron had been executed recently', ['last' => date(DateTimeFormat::MYSQL, $last_cron), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]); return false;