- /**
- * fix the queue entry if the worker process died
- *
- * @return void
- * @throws \Exception
- */
- private static function killStaleWorkers()
- {
- $stamp = (float)microtime(true);
- $entries = DBA::select(
- 'workerqueue',
- ['id', 'pid', 'executed', 'priority', 'command', 'parameter'],
- ['NOT `done` AND `pid` != 0'],
- ['order' => ['priority', 'retrial', 'created']]
- );
- self::$db_duration += (microtime(true) - $stamp);
-
- while ($entry = DBA::fetch($entries)) {
- if (!posix_kill($entry["pid"], 0)) {
- $stamp = (float)microtime(true);
- DBA::update(
- 'workerqueue',
- ['executed' => DBA::NULL_DATETIME, 'pid' => 0],
- ['id' => $entry["id"]]
- );
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- } else {
- // Kill long running processes
- // Check if the priority is in a valid range
- if (!in_array($entry["priority"], [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE])) {
- $entry["priority"] = PRIORITY_MEDIUM;
- }
-
- // Define the maximum durations
- $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
- $max_duration = $max_duration_defaults[$entry["priority"]];
-
- $argv = json_decode($entry['parameter'], true);
- if (!empty($entry['command'])) {
- $command = $entry['command'];
- } elseif (!empty($argv)) {
- $command = array_shift($argv);
- } else {
- return;
- }
-
- $command = basename($command);
-
- // How long is the process already running?
- $duration = (time() - strtotime($entry["executed"])) / 60;
- if ($duration > $max_duration) {
- Logger::notice('Worker process took too much time - killed', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
- posix_kill($entry["pid"], SIGTERM);
-
- // We killed the stale process.
- // To avoid a blocking situation we reschedule the process at the beginning of the queue.
- // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
- $new_priority = $entry["priority"];
- if ($entry["priority"] == PRIORITY_HIGH) {
- $new_priority = PRIORITY_MEDIUM;
- } elseif ($entry["priority"] == PRIORITY_MEDIUM) {
- $new_priority = PRIORITY_LOW;
- } elseif ($entry["priority"] != PRIORITY_CRITICAL) {
- $new_priority = PRIORITY_NEGLIGIBLE;
- }
- $stamp = (float)microtime(true);
- DBA::update(
- 'workerqueue',
- ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0],
- ['id' => $entry["id"]]
- );
- self::$db_duration += (microtime(true) - $stamp);
- self::$db_duration_write += (microtime(true) - $stamp);
- } else {
- Logger::info('Process runtime is okay', ['duration' => number_format($duration, 3), 'max' => $max_duration, 'id' => $entry["id"], 'pid' => $entry["pid"], 'command' => $command]);
- }
- }
- }
- DBA::close($entries);
- }