X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=d43d8c7281008198beac9b812ee9bd160fbf9d56;hb=d5373c583bc12d2104659aaf6386d2b9e28ec68d;hp=3d1648ea45936bfee7b81cb80ac1923c57012b56;hpb=0e2e488521fbcf2d52dc8037ee6e9dd577fbf14c;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index 3d1648ea45..d43d8c7281 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -1,6 +1,6 @@ isMaxLoadReached()) { + if (DI::system()->isMaxLoadReached()) { Logger::notice('Pre check: maximum load reached, quitting.'); return; } // We now start the process. This is done after the load check since this could increase the load. - DI::process()->start(); + self::$process = $process; // Kill stale processes every 5 minutes $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0); @@ -134,7 +138,7 @@ class Worker } // Check free memory - if (DI::process()->isMinMemoryReached()) { + if (DI::system()->isMinMemoryReached()) { Logger::warning('Memory limit reached, quitting.'); DI::lock()->release(self::LOCK_WORKER); return; @@ -147,7 +151,7 @@ class Worker // Quit the worker once every cron interval if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) { Logger::info('Process lifetime reached, respawning.'); - self::unclaimProcess(); + self::unclaimProcess($process); if (self::isDaemonMode()) { self::IPCSetJobState(true); } else { @@ -180,7 +184,7 @@ class Worker } // Do we have too few memory? - if (DI::process()->isMinMemoryReached()) { + if (DI::system()->isMinMemoryReached()) { Logger::warning('Memory limit reached, quitting.'); return false; } @@ -192,7 +196,7 @@ class Worker } // Possibly there are too much database processes that block the system - if (DI::process()->isMaxProcessesReached()) { + if (DI::system()->isMaxProcessesReached()) { Logger::warning('Maximum processes reached, quitting.'); return false; } @@ -334,7 +338,7 @@ class Worker } // Constantly check the number of parallel database processes - if (DI::process()->isMaxProcessesReached()) { + if (DI::system()->isMaxProcessesReached()) { Logger::warning("Max processes reached for process", ['pid' => $mypid]); return false; } @@ -750,7 +754,7 @@ class Worker } $stamp = (float)microtime(true); - $jobs = DBA::count('workerqueue', ["`done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval]); + $jobs = DBA::count('workerqueue', ["`done` AND `executed` > ?", DateTimeFormat::utc('now - ' . $interval . ' minute')]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_stat += (microtime(true) - $stamp); $jobs_per_minute[$interval] = number_format($jobs / $interval, 0); @@ -842,7 +846,7 @@ class Worker private static function activeWorkers() { $stamp = (float)microtime(true); - $count = DBA::count('process', ['command' => 'Worker.php']); + $count = DI::process()->countCommand('Worker.php'); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_count += (microtime(true) - $stamp); return $count; @@ -1105,15 +1109,15 @@ class Worker /** * Removes a workerqueue entry from the current process * + * @param Process $process the process behind the workerqueue + * * @return void * @throws \Exception */ - public static function unclaimProcess() + public static function unclaimProcess(Process $process) { - $mypid = getmypid(); - $stamp = (float)microtime(true); - DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]); + DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $process->pid, 'done' => false]); self::$db_duration += (microtime(true) - $stamp); self::$db_duration_write += (microtime(true) - $stamp); } @@ -1146,7 +1150,7 @@ class Worker */ private static function forkProcess(bool $do_cron) { - if (DI::process()->isMinMemoryReached()) { + if (DI::system()->isMinMemoryReached()) { Logger::warning('Memory limit reached - quitting'); return; } @@ -1176,26 +1180,25 @@ class Worker } // We now are in the new worker - $pid = getmypid(); - DBA::connect(); - /// @todo Reinitialize the logger to set a new process_id and uid - DI::process()->setPid($pid); + + DI::flushLogger(); + $process = DI::process()->create(getmypid(), basename(__FILE__)); $cycles = 0; - while (!self::IPCJobsExists($pid) && (++$cycles < 100)) { + while (!self::IPCJobsExists($process->pid) && (++$cycles < 100)) { usleep(10000); } - Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]); + Logger::info('Worker spawned', ['pid' => $process->pid, 'wait_cycles' => $cycles]); - self::processQueue($do_cron); + self::processQueue($do_cron, $process); - self::unclaimProcess(); + self::unclaimProcess($process); - self::IPCSetJobState(false, $pid); - DI::process()->end(); - Logger::info('Worker ended', ['pid' => $pid]); + self::IPCSetJobState(false, $process->pid); + DI::process()->delete($process); + Logger::info('Worker ended', ['pid' => $process->pid]); exit(); } @@ -1211,9 +1214,7 @@ class Worker if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) { self::forkProcess($do_cron); } else { - $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), - DI::modelProcess(), DI::app()->getBasePath(), getmypid()); - $process->run('bin/worker.php', ['no_cron' => !$do_cron]); + DI::system()->run('bin/worker.php', ['no_cron' => !$do_cron]); } if (self::isDaemonMode()) { self::IPCSetJobState(false); @@ -1571,8 +1572,7 @@ class Worker Logger::notice('Starting new daemon process'); $command = 'bin/daemon.php'; $a = DI::app(); - $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid()); - $process->run($command, ['start']); + DI::system()->run($command, ['start']); Logger::notice('New daemon process started'); }