X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=d43d8c7281008198beac9b812ee9bd160fbf9d56;hb=d5373c583bc12d2104659aaf6386d2b9e28ec68d;hp=0431c79521507e646bc4b0b990e2fe892ccd4d8e;hpb=6f290607de7f10cea7429aacd0b394fd3f4c4e69;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 0431c79521..d43d8c7281 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1,6 +1,6 @@ isMaxLoadReached()) { + if (DI::system()->isMaxLoadReached()) { Logger::notice('Pre check: maximum load reached, quitting.'); return; } // We now start the process. This is done after the load check since this could increase the load. - DI::process()->start(); + self::$process = $process; // Kill stale processes every 5 minutes $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); @@ -134,7 +138,7 @@ class Worker } // Check free memory - if (DI::process()->isMinMemoryReached()) { + if (DI::system()->isMinMemoryReached()) { Logger::warning('Memory limit reached, quitting.'); DI::lock()->release(self::LOCK_WORKER); return; @@ -147,7 +151,7 @@ class Worker // Quit the worker once every cron interval if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { Logger::info('Process lifetime reached, respawning.'); - self::unclaimProcess(); + self::unclaimProcess($process); if (self::isDaemonMode()) { self::IPCSetJobState(true); } else { @@ -180,7 +184,7 @@ class Worker } // Do we have too few memory? - if (DI::process()->isMinMemoryReached()) { + if (DI::system()->isMinMemoryReached()) { Logger::warning('Memory limit reached, quitting.'); return false; } @@ -192,7 +196,7 @@ class Worker } // Possibly there are too much database processes that block the system - if (DI::process()->isMaxProcessesReached()) { + if (DI::system()->isMaxProcessesReached()) { Logger::warning('Maximum processes reached, quitting.'); return false; } @@ -277,6 +281,44 @@ class Worker return DBA::exists('workerqueue', $condition); } + /** + * Checks if the given file is valid to be included + * + * @param mixed $file + * @return bool + */ + private static function validateInclude(&$file) + { + $orig_file = $file; + + $file = realpath($file); + + if (strpos($file, getcwd()) !== 0) { + return false; + } + + $file = str_replace(getcwd() . "/", "", $file, $count); + if ($count != 1) { + return false; + } + + if ($orig_file !== $file) { + return false; + } + + $valid = false; + if (strpos($file, "include/") === 0) { + $valid = true; + } + + if (strpos($file, "addon/") === 0) { + $valid = true; + } + + // Simply return flag + return $valid; + } + /** * Execute a worker entry * @@ -296,7 +338,7 @@ class Worker } // Constantly check the number of parallel database processes - if (DI::process()->isMaxProcessesReached()) { + if (DI::system()->isMaxProcessesReached()) { Logger::warning("Max processes reached for process", ['pid' => $mypid]); return false; } @@ -360,7 +402,7 @@ class Worker $include = "include/".$include.".php"; } - if (!validate_include($include)) { + if (!self::validateInclude($include)) { Logger::warning("Include file is not valid", ['file' => $argv[0]]); $stamp = (float)microtime(true); DBA::delete('workerqueue', ['id' => $queue["id"]]); @@ -712,7 +754,7 @@ class Worker } $stamp = (float)microtime(true); - $jobs = DBA::count('workerqueue', ["`done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval]); + $jobs = DBA::count('workerqueue', ["`done` AND `executed` > ?", DateTimeFormat::utc('now - ' . $interval . ' minute')]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_stat += (microtime(true) - $stamp); $jobs_per_minute[$interval] = number_format($jobs / $interval, 0); @@ -804,7 +846,7 @@ class Worker private static function activeWorkers() { $stamp = (float)microtime(true); - $count = DBA::count('process', ['command' => 'Worker.php']); + $count = DI::process()->countCommand('Worker.php'); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_count += (microtime(true) - $stamp); return $count; @@ -1067,15 +1109,15 @@ class Worker /** * Removes a workerqueue entry from the current process * + * @param Process $process the process behind the workerqueue + * * @return void * @throws \Exception */ - public static function unclaimProcess() + public static function unclaimProcess(Process $process) { - $mypid = getmypid(); - $stamp = (float)microtime(true); - DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]); + DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $process->pid, 'done' => false]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } @@ -1108,7 +1150,7 @@ class Worker */ private static function forkProcess(bool $do_cron) { - if (DI::process()->isMinMemoryReached()) { + if (DI::system()->isMinMemoryReached()) { Logger::warning('Memory limit reached - quitting'); return; } @@ -1138,26 +1180,25 @@ class Worker } // We now are in the new worker - $pid = getmypid(); - DBA::connect(); - /// @todo Reinitialize the logger to set a new process_id and uid - DI::process()->setPid($pid); + + DI::flushLogger(); + $process = DI::process()->create(getmypid(), basename(__FILE__)); $cycles = 0; - while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { + while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) { usleep(10000); } - Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]); + Logger::info('Worker spawned', ['pid' => $process->pid, 'wait_cycles' => $cycles]); - self::processQueue($do_cron); + self::processQueue($do_cron, $process); - self::unclaimProcess(); + self::unclaimProcess($process); - self::IPCSetJobState(false, $pid); - DI::process()->end(); - Logger::info('Worker ended', ['pid' => $pid]); + self::IPCSetJobState(false, $process->pid); + DI::process()->delete($process); + Logger::info('Worker ended', ['pid' => $process->pid]); exit(); } @@ -1173,9 +1214,7 @@ class Worker if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) { self::forkProcess($do_cron); } else { - $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), - DI::modelProcess(), DI::app()->getBasePath(), getmypid()); - $process->run('bin/worker.php', ['no_cron' => !$do_cron]); + DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]); } if (self::isDaemonMode()) { self::IPCSetJobState(false); @@ -1533,8 +1572,7 @@ class Worker Logger::notice('Starting new daemon process'); $command = 'bin/daemon.php'; $a = DI::app(); - $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid()); - $process->run($command, ['start']); + DI::system()->run($command, ['start']); Logger::notice('New daemon process started'); }