require_once("boot.php");
function poller_run($argv, $argc){
- global $a, $db, $poller_up_start;
+ global $a, $db, $poller_up_start, $poller_db_duration;
$poller_up_start = microtime(true);
poller_run_cron();
}
- $refetched = false;
-
$starttime = time();
// We fetch the next queue entry that is about to be executed
while ($r = poller_worker_process()) {
+
+ $refetched = false;
+
foreach ($r AS $entry) {
// Assure that the priority is an integer value
$entry['priority'] = (int)$entry['priority'];
logger('Process execution failed, quitting.', LOGGER_DEBUG);
return;
}
+
+ // If possible we will fetch new jobs for this worker
+ if (!$refetched && Lock::set('poller_worker_process', 0)) {
+ $stamp = (float)microtime(true);
+ $refetched = find_worker_processes();
+ $poller_db_duration += (microtime(true) - $stamp);
+ Lock::remove('poller_worker_process');
+ }
}
// To avoid the quitting of multiple pollers only one poller at a time will execute the check
if (Lock::set('poller_worker', 0)) {
+ $stamp = (float)microtime(true);
// Count active workers and compare them with a maximum value that depends on the load
if (poller_too_much_workers()) {
logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
return;
}
Lock::remove('poller_worker');
+ $poller_db_duration += (microtime(true) - $stamp);
}
// Quit the poller once every 5 minutes
logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
return;
}
-
- // If possible we will fetch new jobs for this worker
- if (!$refetched && Lock::set('poller_worker_process', 0)) {
- $refetched = find_worker_processes();
- Lock::remove('poller_worker_process');
- }
-
}
logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
}
* @return boolean "true" if further processing should be stopped
*/
function poller_execute($queue) {
- global $poller_db_duration;
+ global $poller_db_duration, $poller_last_update;
$a = get_app();
$funcname = str_replace(".php", "", basename($argv[0]))."_run";
if (function_exists($funcname)) {
+
+ // We constantly update the "executed" date every minute to avoid being killed to soon
+ if (!isset($poller_last_update)) {
+ $poller_last_update = strtotime($queue["executed"]);
+ }
+
+ $age = (time() - $poller_last_update) / 60;
+ $poller_last_update = time();
+
+ if ($age > 1) {
+ $stamp = (float)microtime(true);
+ dba::update('workerqueue', array('executed' => datetime_convert()), array('pid' => $mypid, 'done' => false));
+ $poller_db_duration += (microtime(true) - $stamp);
+ }
+
poller_exec_function($queue, $funcname, $argv);
$stamp = (float)microtime(true);
*
*/
function poller_kill_stale_workers() {
- $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s' AND NOT `done`", dbesc(NULL_DATE));
+ $entries = dba::p("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0", NULL_DATE);
- if (!dbm::is_result($r)) {
- // No processing here needed
- return;
- }
-
- foreach ($r AS $pid) {
- if (!posix_kill($pid["pid"], 0)) {
+ while ($entry = dba::fetch($entries)) {
+ if (!posix_kill($entry["pid"], 0)) {
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
- array('pid' => $pid["pid"], 'done' => false));
+ array('pid' => $entry["pid"], 'done' => false));
} else {
// Kill long running processes
-
// Check if the priority is in a valid range
- if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) {
- $pid["priority"] = PRIORITY_MEDIUM;
+ if (!in_array($entry["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) {
+ $entry["priority"] = PRIORITY_MEDIUM;
}
// Define the maximum durations
- $max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360);
- $max_duration = $max_duration_defaults[$pid["priority"]];
+ $max_duration_defaults = array(PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720);
+ $max_duration = $max_duration_defaults[$entry["priority"]];
- $argv = json_decode($pid["parameter"]);
+ $argv = json_decode($entry["parameter"]);
$argv[0] = basename($argv[0]);
// How long is the process already running?
- $duration = (time() - strtotime($pid["executed"])) / 60;
+ $duration = (time() - strtotime($entry["executed"])) / 60;
if ($duration > $max_duration) {
- logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
- posix_kill($pid["pid"], SIGTERM);
+ logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
+ posix_kill($entry["pid"], SIGTERM);
// We killed the stale process.
// To avoid a blocking situation we reschedule the process at the beginning of the queue.
- // Additionally we are lowering the priority.
+ // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
+ if ($entry["priority"] == PRIORITY_HIGH) {
+ $new_priority = PRIORITY_MEDIUM;
+ } elseif ($entry["priority"] == PRIORITY_MEDIUM) {
+ $new_priority = PRIORITY_LOW;
+ } elseif ($entry["priority"] != PRIORITY_CRITICAL) {
+ $new_priority = PRIORITY_NEGLIGIBLE;
+ }
dba::update('workerqueue',
- array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
- array('pid' => $pid["pid"], 'done' => false));
+ array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => $new_priority, 'pid' => 0),
+ array('pid' => $entry["pid"], 'done' => false));
} else {
- logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
+ logger("Worker process ".$entry["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
}
}
}
}
dba::close($entries);
- $jobs_per_minute = 0;
-
- $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE");
- if ($job = dba::fetch($jobs)) {
- $jobs_per_minute = number_format($job['jobs'] / 10, 0);
+ $intervals = array(1, 10, 60);
+ $jobs_per_minute = array();
+ foreach ($intervals AS $interval) {
+ $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ".intval($interval)." MINUTE");
+ if ($job = dba::fetch($jobs)) {
+ $jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0);
+ }
+ dba::close($jobs);
}
- dba::close($jobs);
-
- $processlist = ' - jpm: '.$jobs_per_minute.' ('.implode(', ', $listitem).')';
+ $processlist = ' - jpm: '.implode('/', $jobs_per_minute).' ('.implode(', ', $listitem).')';
}
$entries = poller_total_entries();
if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) {
logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
$args = array("include/poller.php", "no_cron");
- $a = get_app();
- $a->proc_run($args);
+ get_app()->proc_run($args);
}
}
* @return boolean Have we found something?
*/
function find_worker_processes() {
+
+ $mypid = getmypid();
+
// Check if we should pass some low priority process
$highest_priority = 0;
$found = false;
// The higher the number of parallel workers, the more we prefetch to prevent concurring access
- $limit = Config::get("system", "worker_queues", 4) * 2;
+ $limit = Config::get("system", "worker_queues", 4);
$limit = Config::get('system', 'worker_fetch_limit', $limit);
if (poller_passing_slow($highest_priority)) {
// Are there waiting processes with a higher priority than the currently highest?
- $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
+ $result = dba::p("SELECT `id` FROM `workerqueue`
WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
ORDER BY `priority`, `created` LIMIT ".intval($limit),
- datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
- if ($result) {
- $found = (dba::affected_rows() > 0);
+ NULL_DATE, $highest_priority);
+
+ while ($id = dba::fetch($result)) {
+ $ids[] = $id["id"];
}
+ dba::close($result);
+
+ $found = (count($ids) > 0);
if (!$found) {
// Give slower processes some processing time
- $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
+ $result = dba::p("SELECT `id` FROM `workerqueue`
WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
ORDER BY `priority`, `created` LIMIT ".intval($limit),
- datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
- if ($result) {
- $found = (dba::affected_rows() > 0);
+ NULL_DATE, $highest_priority);
+
+ while ($id = dba::fetch($result)) {
+ $ids[] = $id["id"];
}
+ dba::close($result);
+
+ $found = (count($ids) > 0);
}
}
// If there is no result (or we shouldn't pass lower processes) we check without priority limit
if (!$found) {
- $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit),
- datetime_convert(), getmypid(), NULL_DATE);
- if ($result) {
- $found = (dba::affected_rows() > 0);
+ $result = dba::p("SELECT `id` FROM `workerqueue` WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), NULL_DATE);
+
+ while ($id = dba::fetch($result)) {
+ $ids[] = $id["id"];
}
+ dba::close($result);
+
+ $found = (count($ids) > 0);
}
+
+ if ($found) {
+ $sql = "UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`;";
+ array_unshift($ids, datetime_convert(), $mypid);
+ dba::e($sql, $ids);
+ }
+
return $found;
}
logger('Call poller', LOGGER_DEBUG);
$args = array("include/poller.php", "no_cron");
- $a = get_app();
- $a->proc_run($args);
+ get_app()->proc_run($args);
return;
}
get_app()->end_process();
- Lock::remove('poller_worker');
- Lock::remove('poller_worker_process');
-
killme();
}