X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=src%2FCore%2FWorker.php;h=8960780e55618bfa6b9b0472c1ebf1eeb8f208a7;hb=962fbc916647a0ab8fa3a6d73a3886741cdbe707;hp=e3d8df45e40625eb2ce42b31d7328f6639f549f9;hpb=c9f02d534e2016acf18d7fa18db193d056495841;p=friendica.git diff --git a/src/Core/Worker.php b/src/Core/Worker.php index e3d8df45e4..8960780e55 100644 --- a/src/Core/Worker.php +++ b/src/Core/Worker.php @@ -54,9 +54,9 @@ class Worker self::startProcess(); // Kill stale processes every 5 minutes - $last_cleanup = Config::get('system', 'poller_last_cleaned', 0); + $last_cleanup = Config::get('system', 'worker_last_cleaned', 0); if (time() > ($last_cleanup + 300)) { - Config::set('system', 'poller_last_cleaned', time()); + Config::set('system', 'worker_last_cleaned', time()); self::killStaleWorkers(); } @@ -108,16 +108,16 @@ class Worker } // If possible we will fetch new jobs for this worker - if (!$refetched && Lock::set('poller_worker_process', 0)) { + if (!$refetched && Lock::set('worker_process', 0)) { $stamp = (float)microtime(true); $refetched = self::findWorkerProcesses($passing_slow); self::$db_duration += (microtime(true) - $stamp); - Lock::remove('poller_worker_process'); + Lock::remove('worker_process'); } } // To avoid the quitting of multiple workers only one worker at a time will execute the check - if (Lock::set('poller_worker', 0)) { + if (Lock::set('worker', 0)) { $stamp = (float)microtime(true); // Count active workers and compare them with a maximum value that depends on the load if (self::tooMuchWorkers()) { @@ -130,7 +130,7 @@ class Worker logger('Memory limit reached, quitting.', LOGGER_DEBUG); return; } - Lock::remove('poller_worker'); + Lock::remove('worker'); self::$db_duration += (microtime(true) - $stamp); } @@ -140,6 +140,11 @@ class Worker return; } } + + // Cleaning up. Possibly not needed, but it doesn't harm anything. + if (Config::get('system', 'worker_daemon_mode', false)) { + self::IPCSetJobState(false); + } logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG); } @@ -150,12 +155,7 @@ class Worker */ private static function totalEntries() { - $s = dba::fetch_first("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= ? AND NOT `done`", NULL_DATE); - if (DBM::is_result($s)) { - return $s["total"]; - } else { - return 0; - } + return dba::count('workerqueue', ["`executed` <= ? AND NOT `done`", NULL_DATE]); } /** @@ -244,7 +244,7 @@ class Worker $stamp = (float)microtime(true); if (dba::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) { - Config::set('system', 'last_poller_execution', DateTimeFormat::utcNow()); + Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); @@ -285,7 +285,7 @@ class Worker $stamp = (float)microtime(true); if (dba::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) { - Config::set('system', 'last_poller_execution', DateTimeFormat::utcNow()); + Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow()); } self::$db_duration = (microtime(true) - $stamp); } else { @@ -688,12 +688,21 @@ class Worker logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG); // Are there fewer workers running as possible? Then fork a new one. - if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) { + if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) { logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG); - self::spawnWorker(); + if (Config::get('system', 'worker_daemon_mode', false)) { + self::IPCSetJobState(true); + } else { + self::spawnWorker(); + } } } + // if there are too much worker, we down't spawn a new one. + if (Config::get('system', 'worker_daemon_mode', false) && ($active >= $queues)) { + self::IPCSetJobState(false); + } + return $active >= $queues; } @@ -704,9 +713,7 @@ class Worker */ private static function activeWorkers() { - $workers = dba::fetch_first("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'Worker.php'"); - - return $workers["processes"]; + return dba::count('process', ['command' => 'Worker.php']); } /** @@ -873,7 +880,7 @@ class Worker dba::close($r); $stamp = (float)microtime(true); - if (!Lock::set('poller_worker_process')) { + if (!Lock::set('worker_process')) { return false; } self::$lock_duration = (microtime(true) - $stamp); @@ -882,7 +889,7 @@ class Worker $found = self::findWorkerProcesses($passing_slow); self::$db_duration += (microtime(true) - $stamp); - Lock::remove('poller_worker_process'); + Lock::remove('worker_process'); if ($found) { $r = dba::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]); @@ -959,9 +966,9 @@ class Worker self::clearProcesses(); - $workers = dba::fetch_first("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'worker.php'"); + $workers = self::activeWorkers(); - if ($workers["processes"] == 0) { + if ($workers == 0) { self::callWorker(); } } @@ -999,12 +1006,23 @@ class Worker } /** + * @brief Spawns a new worker * @return void */ - public static function spawnWorker() + public static function spawnWorker($do_cron = false) { - $args = ["bin/worker.php", "no_cron"]; + $args = ["bin/worker.php"]; + + if (!$do_cron) { + $args[] = "no_cron"; + } + get_app()->proc_run($args); + + // after spawning we have to remove the flag. + if (Config::get('system', 'worker_daemon_mode', false)) { + self::IPCSetJobState(false); + } } /** @@ -1039,7 +1057,7 @@ class Worker } $priority = PRIORITY_MEDIUM; - $dont_fork = Config::get("system", "worker_dont_fork"); + $dont_fork = Config::get("system", "worker_dont_fork", false); $created = DateTimeFormat::utcNow(); $run_parameter = array_shift($args); @@ -1076,18 +1094,24 @@ class Worker } // If there is a lock then we don't have to check for too much worker - if (!Lock::set('poller_worker', 0)) { + if (!Lock::set('worker', 0)) { return true; } // If there are already enough workers running, don't fork another one $quit = self::tooMuchWorkers(); - Lock::remove('poller_worker'); + Lock::remove('worker'); if ($quit) { return true; } + // We tell the daemon that a new job entry exists + if (Config::get('system', 'worker_daemon_mode', false)) { + // We don't have to set the IPC flag - this is done in "tooMuchWorkers" + return true; + } + // Now call the worker to execute the jobs that we just added to the queue self::spawnWorker(); @@ -1120,4 +1144,33 @@ class Worker { return Process::deleteByPid(); } + + /** + * Set the flag if some job is waiting + * + * @brief Set the flag if some job is waiting + * @param boolean $jobs Is there a waiting job? + */ + public static function IPCSetJobState($jobs) + { + dba::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true); + } + + /** + * Checks if some worker job waits to be executed + * + * @brief Checks if some worker job waits to be executed + * @return bool + */ + public static function IPCJobsExists() + { + $row = dba::selectFirst('worker-ipc', ['jobs'], ['key' => 1]); + + // When we don't have a row, no job is running + if (!DBM::is_result($row)) { + return false; + } + + return (bool)$row['jobs']; + } }