From 7580c8ed1151c77f816cb8910ccbfe6549ad7c63 Mon Sep 17 00:00:00 2001 From: Michael Vogel Date: Sat, 23 Jul 2016 22:57:22 +0200 Subject: [PATCH] Processing of high priority processes --- include/poller.php | 51 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/include/poller.php b/include/poller.php index bf25549b54..3f1c1d3089 100644 --- a/include/poller.php +++ b/include/poller.php @@ -39,7 +39,7 @@ function poller_run(&$argv, &$argc){ return; // Checking the number of workers - if (poller_too_much_workers()) { + if (poller_too_much_workers($entries, $queues, $maxqueues)) { poller_kill_stale_workers(); return; } @@ -58,13 +58,15 @@ function poller_run(&$argv, &$argc){ sleep(4); // Checking number of workers - if (poller_too_much_workers()) + if (poller_too_much_workers($entries, $queues, $maxqueues)) return; $cooldown = Config::get("system", "worker_cooldown", 0); $starttime = time(); + $delayed = 0; + while ($r = q("SELECT * FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `created` LIMIT 1")) { // Constantly check the number of parallel database processes @@ -76,9 +78,37 @@ function poller_run(&$argv, &$argc){ return; // Count active workers and compare them with a maximum value that depends on the load - if (poller_too_much_workers()) + if (poller_too_much_workers($entries, $queues, $maxqueues)) return; + $argv = json_decode($r[0]["parameter"]); + + $argc = count($argv); + + $funcname = str_replace(".php", "", basename($argv[0]))."_run"; + + // Define the processes that have priority over any other process + /// @todo Better check for priority processes + $priority = array("delivery_run", "notifier_run", "pubsubpublish_run"); + + // Delay other processes if the system has a high load and there are many pending processes. + // The "delayed" value is a safety mechanism for the case when there are no high priority processes in the queue. + if ((($queues / $maxqueues) <= 0.5) AND (($entries > 100) AND ($delayed < 100)) AND !in_array($funcname, $priority)) { + q("UPDATE `workerqueue` SET `created` = '%s' WHERE `id` = %d", + dbesc(datetime_convert()), + intval($r[0]["id"])); + + // Count the number of delayed processes + ++$delayed; + logger("Delay call to '".$funcname."' for performance reasons (Delay ".$delayed.")", LOGGER_DEBUG); + continue; + } + + // If we delivered a process that has high priority we reset the delayed counter. + // When we reached the limit we will process any entry until we reach a high priority process. + if (in_array($funcname, $priority)) + $delayed = 0; + q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `executed` = '0000-00-00 00:00:00'", dbesc(datetime_convert()), intval(getmypid()), @@ -93,10 +123,6 @@ function poller_run(&$argv, &$argc){ continue; } - $argv = json_decode($r[0]["parameter"]); - - $argc = count($argv); - // Check for existance and validity of the include file $include = $argv[0]; @@ -108,8 +134,6 @@ function poller_run(&$argv, &$argc){ require_once($include); - $funcname=str_replace(".php", "", basename($argv[0]))."_run"; - if (function_exists($funcname)) { logger("Process ".getmypid()." - ID ".$r[0]["id"].": ".$funcname." ".$r[0]["parameter"]); $funcname($argv, $argc); @@ -244,13 +268,17 @@ function poller_kill_stale_workers() { } } -function poller_too_much_workers() { +function poller_too_much_workers(&$entries, &$queues, &$maxqueues) { + + $entries = 0; $queues = get_config("system", "worker_queues"); if ($queues == 0) $queues = 4; + $maxqueues = $queues; + $active = poller_active_workers(); // Decrease the number of workers at higher load @@ -268,8 +296,9 @@ function poller_too_much_workers() { $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent)); $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00'"); + $entries = $s[0]["total"]; - logger("Current load: ".$load." - maximum: ".$maxsysload." - current queues: ".$active."/".$s[0]["total"]." - maximum: ".$queues, LOGGER_DEBUG); + logger("Current load: ".$load." - maximum: ".$maxsysload." - current queues: ".$active."/".$entries." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG); } -- 2.39.5