X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=e29cf765a4fa38c5885798ef396cc0c6a9b8f89a;hb=44291a465bb9b9c54b8781d6d6f13e1c3f317c1b;hp=f21513f33c33c72f2f71cae0b91189b5bdda1577;hpb=2a431b580f2e8f6a596e84175932e793678cde63;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index f21513f33c..e29cf765a4 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1,6 +1,6 @@ isMaxLoadReached()) { + if (DI::system()->isMaxLoadReached()) { Logger::notice('Pre check: maximum load reached, quitting.'); return; } // We now start the process. This is done after the load check since this could increase the load. - DI::process()->start(); + self::$process = $process; // Kill stale processes every 5 minutes $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); @@ -104,8 +104,7 @@ class Worker // Don't refetch when a worker fetches tasks for multiple workers $refetched = DI::config()->get('system', 'worker_multiple_fetch'); foreach ($r as $entry) { - // Assure that the priority is an integer value - $entry['priority'] = (int)$entry['priority']; + $entry = self::checkPriority($entry); // The work will be done if (!self::execute($entry)) { @@ -137,7 +136,7 @@ class Worker } // Check free memory - if (DI::process()->isMinMemoryReached()) { + if (DI::system()->isMinMemoryReached()) { Logger::warning('Memory limit reached, quitting.'); DI::lock()->release(self::LOCK_WORKER); return; @@ -150,7 +149,7 @@ class Worker // Quit the worker once every cron interval if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { Logger::info('Process lifetime reached, respawning.'); - self::unclaimProcess(); + self::unclaimProcess($process); if (self::isDaemonMode()) { self::IPCSetJobState(true); } else { @@ -167,6 +166,24 @@ class Worker Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]); } + /** + * Check and fix the priority of a worker task + * @param array $entry + * @return array + */ + private static function checkPriority(array $entry) + { + $entry['priority'] = (int)$entry['priority']; + + if (!in_array($entry['priority'], PRIORITIES)) { + Logger::warning('Invalid priority', ['entry' => $entry, 'callstack' => System::callstack(20)]); + DBA::update('workerqueue', ['priority' => PRIORITY_MEDIUM], ['id' => $entry['id']]); + $entry['priority'] = PRIORITY_MEDIUM; + } + + return $entry; + } + /** * Checks if the system is ready. * @@ -183,7 +200,7 @@ class Worker } // Do we have too few memory? - if (DI::process()->isMinMemoryReached()) { + if (DI::system()->isMinMemoryReached()) { Logger::warning('Memory limit reached, quitting.'); return false; } @@ -195,7 +212,7 @@ class Worker } // Possibly there are too much database processes that block the system - if (DI::process()->isMaxProcessesReached()) { + if (DI::system()->isMaxProcessesReached()) { Logger::warning('Maximum processes reached, quitting.'); return false; } @@ -260,7 +277,7 @@ class Worker $workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]); self::$db_duration += (microtime(true) - $stamp); if (DBA::isResult($workerqueue)) { - return $workerqueue["priority"]; + return $workerqueue['priority']; } else { return 0; } @@ -280,6 +297,44 @@ class Worker return DBA::exists('workerqueue', $condition); } + /** + * Checks if the given file is valid to be included + * + * @param mixed $file + * @return bool + */ + private static function validateInclude(&$file) + { + $orig_file = $file; + + $file = realpath($file); + + if (strpos($file, getcwd()) !== 0) { + return false; + } + + $file = str_replace(getcwd() . "/", "", $file, $count); + if ($count != 1) { + return false; + } + + if ($orig_file !== $file) { + return false; + } + + $valid = false; + if (strpos($file, "include/") === 0) { + $valid = true; + } + + if (strpos($file, "addon/") === 0) { + $valid = true; + } + + // Simply return flag + return $valid; + } + /** * Execute a worker entry * @@ -299,7 +354,7 @@ class Worker } // Constantly check the number of parallel database processes - if (DI::process()->isMaxProcessesReached()) { + if (DI::system()->isMaxProcessesReached()) { Logger::warning("Max processes reached for process", ['pid' => $mypid]); return false; } @@ -363,7 +418,7 @@ class Worker $include = "include/".$include.".php"; } - if (!validate_include($include)) { + if (!self::validateInclude($include)) { Logger::warning("Include file is not valid", ['file' => $argv[0]]); $stamp = (float)microtime(true); DBA::delete('workerqueue', ['id' => $queue["id"]]); @@ -427,13 +482,13 @@ class Worker $cooldown = DI::config()->get("system", "worker_cooldown", 0); if ($cooldown > 0) { - Logger::info('Pre execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]); + Logger::info('Pre execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]); sleep($cooldown); } Logger::enableWorker($funcname); - Logger::info("Process start.", ['priority' => $queue["priority"], 'id' => $queue["id"]]); + Logger::info("Process start.", ['priority' => $queue['priority'], 'id' => $queue["id"]]); $stamp = (float)microtime(true); @@ -441,12 +496,7 @@ class Worker // For this reason the variables have to be initialized. DI::profiler()->reset(); - if (!in_array($queue['priority'], PRIORITIES)) { - Logger::warning('Invalid priority', ['queue' => $queue, 'callstack' => System::callstack(20)]); - $queue['priority'] = PRIORITY_MEDIUM; - } - - $a->queue = $queue; + $a->setQueue($queue); $up_duration = microtime(true) - self::$up_start; @@ -462,7 +512,7 @@ class Worker Logger::disableWorker(); - unset($a->queue); + $a->setQueue([]); $duration = (microtime(true) - $stamp); @@ -490,21 +540,21 @@ class Worker self::$lock_duration = 0; if ($duration > 3600) { - Logger::info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 1 hour.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); } elseif ($duration > 600) { - Logger::info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 10 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); } elseif ($duration > 300) { - Logger::info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 5 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); } elseif ($duration > 120) { - Logger::info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); + Logger::info('Longer than 2 minutes.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration/60, 3)]); } - Logger::info('Process done.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' => round($duration, 3)]); + Logger::info('Process done.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'duration' => round($duration, 3)]); DI::profiler()->saveLog(DI::logger(), "ID " . $queue["id"] . ": " . $funcname); if ($cooldown > 0) { - Logger::info('Post execution cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]); + Logger::info('Post execution cooldown.', ['priority' => $queue['priority'], 'id' => $queue["id"], 'cooldown' => $cooldown]); sleep($cooldown); } } @@ -610,6 +660,8 @@ class Worker self::$db_duration += (microtime(true) - $stamp); while ($entry = DBA::fetch($entries)) { + $entry = self::checkPriority($entry); + if (!posix_kill($entry["pid"], 0)) { $stamp = (float)microtime(true); DBA::update( @@ -621,14 +673,10 @@ class Worker self::$db_duration_write += (microtime(true) - $stamp); } else { // Kill long running processes - // Check if the priority is in a valid range - if (!in_array($entry["priority"], [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE])) { - $entry["priority"] = PRIORITY_MEDIUM; - } // Define the maximum durations $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720]; - $max_duration = $max_duration_defaults[$entry["priority"]]; + $max_duration = $max_duration_defaults[$entry['priority']]; $argv = json_decode($entry['parameter'], true); if (!empty($entry['command'])) { @@ -650,12 +698,12 @@ class Worker // We killed the stale process. // To avoid a blocking situation we reschedule the process at the beginning of the queue. // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL) - $new_priority = $entry["priority"]; - if ($entry["priority"] == PRIORITY_HIGH) { + $new_priority = $entry['priority']; + if ($entry['priority'] == PRIORITY_HIGH) { $new_priority = PRIORITY_MEDIUM; - } elseif ($entry["priority"] == PRIORITY_MEDIUM) { + } elseif ($entry['priority'] == PRIORITY_MEDIUM) { $new_priority = PRIORITY_LOW; - } elseif ($entry["priority"] != PRIORITY_CRITICAL) { + } elseif ($entry['priority'] != PRIORITY_CRITICAL) { $new_priority = PRIORITY_NEGLIGIBLE; } $stamp = (float)microtime(true); @@ -715,13 +763,10 @@ class Worker } $stamp = (float)microtime(true); - $jobs = DBA::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval); + $jobs = DBA::count('workerqueue', ["`done` AND `executed` > ?", DateTimeFormat::utc('now - ' . $interval . ' minute')]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_stat += (microtime(true) - $stamp); - if ($job = DBA::fetch($jobs)) { - $jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0); - } - DBA::close($jobs); + $jobs_per_minute[$interval] = number_format($jobs / $interval, 0); } $processlist = ' - jpm: '.implode('/', $jobs_per_minute); } @@ -742,15 +787,12 @@ class Worker self::$db_duration_stat += (microtime(true) - $stamp); while ($entry = DBA::fetch($jobs)) { $stamp = (float)microtime(true); - $processes = DBA::p("SELECT COUNT(*) AS `running` FROM `workerqueue-view` WHERE `priority` = ?", $entry["priority"]); + $running = DBA::count('workerqueue-view', ['priority' => $entry['priority']]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_stat += (microtime(true) - $stamp); - if ($process = DBA::fetch($processes)) { - $idle_workers -= $process["running"]; - $waiting_processes += $entry["entries"]; - $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"]; - } - DBA::close($processes); + $idle_workers -= $running; + $waiting_processes += $entry["entries"]; + $listitem[$entry['priority']] = $entry['priority'] . ":" . $running . "/" . $entry["entries"]; } DBA::close($jobs); } else { @@ -762,7 +804,7 @@ class Worker while ($entry = DBA::fetch($jobs)) { $idle_workers -= $entry["running"]; - $listitem[$entry["priority"]] = $entry["priority"].":".$entry["running"]; + $listitem[$entry['priority']] = $entry['priority'].":".$entry["running"]; } DBA::close($jobs); } @@ -813,7 +855,7 @@ class Worker private static function activeWorkers() { $stamp = (float)microtime(true); - $count = DBA::count('process', ['command' => 'Worker.php']); + $count = DI::process()->countCommand('Worker.php'); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_count += (microtime(true) - $stamp); return $count; @@ -831,7 +873,7 @@ class Worker $stamp = (float)microtime(true); $queues = DBA::p("SELECT `process`.`pid`, COUNT(`workerqueue`.`pid`) AS `entries` FROM `process` - LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done` + LEFT JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `workerqueue`.`done` GROUP BY `process`.`pid`"); while ($queue = DBA::fetch($queues)) { $ids[$queue['pid']] = $queue['entries']; @@ -1076,15 +1118,15 @@ class Worker /** * Removes a workerqueue entry from the current process * + * @param Process $process the process behind the workerqueue + * * @return void * @throws \Exception */ - public static function unclaimProcess() + public static function unclaimProcess(Process $process) { - $mypid = getmypid(); - $stamp = (float)microtime(true); - DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]); + DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $process->pid, 'done' => false]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } @@ -1107,6 +1149,32 @@ class Worker // Cleaning dead processes self::killStaleWorkers(); + + // Remove old entries from the workerqueue + self::cleanWorkerQueue(); + } + + /** + * Remove old entries from the workerqueue + * + * @return void + */ + private static function cleanWorkerQueue() + { + DBA::delete('workerqueue', ["`done` AND `executed` < ?", DateTimeFormat::utc('now - 1 hour')]); + + // Optimizing this table only last seconds + if (DI::config()->get('system', 'optimize_tables')) { + // We are acquiring the two locks from the worker to avoid locking problems + if (DI::lock()->acquire(Worker::LOCK_PROCESS, 10)) { + if (DI::lock()->acquire(Worker::LOCK_WORKER, 10)) { + DBA::e("OPTIMIZE TABLE `workerqueue`"); + DBA::e("OPTIMIZE TABLE `process`"); + DI::lock()->release(Worker::LOCK_WORKER); + } + DI::lock()->release(Worker::LOCK_PROCESS); + } + } } /** @@ -1117,7 +1185,7 @@ class Worker */ private static function forkProcess(bool $do_cron) { - if (DI::process()->isMinMemoryReached()) { + if (DI::system()->isMinMemoryReached()) { Logger::warning('Memory limit reached - quitting'); return; } @@ -1147,26 +1215,25 @@ class Worker } // We now are in the new worker - $pid = getmypid(); - DBA::connect(); - /// @todo Reinitialize the logger to set a new process_id and uid - DI::process()->setPid($pid); + + DI::flushLogger(); + $process = DI::process()->create(getmypid(), basename(__FILE__)); $cycles = 0; - while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { + while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) { usleep(10000); } - Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]); + Logger::info('Worker spawned', ['pid' => $process->pid, 'wait_cycles' => $cycles]); - self::processQueue($do_cron); + self::processQueue($do_cron, $process); - self::unclaimProcess(); + self::unclaimProcess($process); - self::IPCSetJobState(false, $pid); - DI::process()->end(); - Logger::info('Worker ended', ['pid' => $pid]); + self::IPCSetJobState(false, $process->pid); + DI::process()->delete($process); + Logger::info('Worker ended', ['pid' => $process->pid]); exit(); } @@ -1182,9 +1249,7 @@ class Worker if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) { self::forkProcess($do_cron); } else { - $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), - DI::modelProcess(), DI::app()->getBasePath(), getmypid()); - $process->run('bin/worker.php', ['no_cron' => !$do_cron]); + DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]); } if (self::isDaemonMode()) { self::IPCSetJobState(false); @@ -1200,7 +1265,7 @@ class Worker * or: Worker::add(PRIORITY_HIGH, "Notifier", Delivery::DELETION, $drop_id); * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "Delivery", $post_id); * - * @return boolean "false" if worker queue entry already existed or there had been an error + * @return int "0" if worker queue entry already existed or there had been an error, otherwise the ID of the worker task * @throws \Friendica\Network\HTTPException\InternalServerErrorException * @note $cmd and string args are surrounded with "" * @@ -1208,19 +1273,17 @@ class Worker * array $arr * */ - public static function add($cmd) + public static function add(...$args) { - $args = func_get_args(); - if (!count($args)) { - return false; + return 0; } $arr = ['args' => $args, 'run_cmd' => true]; Hook::callAll("proc_run", $arr); if (!$arr['run_cmd'] || !count($args)) { - return true; + return 1; } $priority = PRIORITY_MEDIUM; @@ -1250,29 +1313,31 @@ class Worker if (isset($run_parameter['force_priority'])) { $force_priority = $run_parameter['force_priority']; } + } else { + throw new \InvalidArgumentException('Priority number or task parameter array expected as first argument'); } $command = array_shift($args); $parameters = json_encode($args); $found = DBA::exists('workerqueue', ['command' => $command, 'parameter' => $parameters, 'done' => false]); - $added = false; + $added = 0; - if (!in_array($priority, PRIORITIES)) { + if (!is_int($priority) || !in_array($priority, PRIORITIES)) { Logger::warning('Invalid priority', ['priority' => $priority, 'command' => $command, 'callstack' => System::callstack(20)]); $priority = PRIORITY_MEDIUM; } // Quit if there was a database error - a precaution for the update process to 3.5.3 if (DBA::errorNo() != 0) { - return false; + return 0; } if (!$found) { - $added = DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created, - 'priority' => $priority, 'next_try' => $delayed]); - if (!$added) { - return false; + if (!DBA::insert('workerqueue', ['command' => $command, 'parameter' => $parameters, 'created' => $created, + 'priority' => $priority, 'next_try' => $delayed])) { + return 0; } + $added = DBA::lastInsertId(); } elseif ($force_priority) { DBA::update('workerqueue', ['priority' => $priority], ['command' => $command, 'parameter' => $parameters, 'done' => false, 'pid' => 0]); } @@ -1348,16 +1413,18 @@ class Worker * Defers the current worker entry * * @return boolean had the entry been deferred? + * @throws \Exception */ - public static function defer() + public static function defer(): bool { - if (empty(DI::app()->queue)) { + $queue = DI::app()->getQueue(); + + if (empty($queue)) { return false; } - $queue = DI::app()->queue; + $queue = self::checkPriority($queue); - $retrial = $queue['retrial']; $id = $queue['id']; $priority = $queue['priority']; @@ -1542,8 +1609,7 @@ class Worker Logger::notice('Starting new daemon process'); $command = 'bin/daemon.php'; $a = DI::app(); - $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid()); - $process->run($command, ['start']); + DI::system()->run($command, ['start']); Logger::notice('New daemon process started'); } @@ -1587,7 +1653,7 @@ class Worker } else { Logger::info('We are outside the maintenance window', ['current' => date('H:i:s', $current), 'start' => date('H:i:s', $start), 'end' => date('H:i:s', $end)]); } - + return $execute; } }