poller_run_cron();
}
+ $refetched = false;
+
$starttime = time();
// We fetch the next queue entry that is about to be executed
logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
return;
}
+
+ // If possible we will fetch new jobs for this worker
+ if (!$refetched && Lock::set('poller_worker_process', 0)) {
+ $refetched = find_worker_processes();
+ Lock::remove('poller_worker_process');
+ }
+
}
logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
}
* @param array $argv Array of values to be passed to the function
*/
function poller_exec_function($queue, $funcname, $argv) {
- global $poller_up_start, $poller_db_duration;
+ global $poller_up_start, $poller_db_duration, $poller_lock_duration;
$a = get_app();
* The execution time is the productive time.
* By changing parameters like the maximum number of workers we can check the effectivness.
*/
- logger('DB: '.number_format($poller_db_duration, 2).' - Rest: '.number_format($up_duration - $poller_db_duration, 2).' - Execution: '.number_format($duration, 2), LOGGER_DEBUG);
+ logger('DB: '.number_format($poller_db_duration, 2).
+ ' - Lock: '.number_format($poller_lock_duration, 2).
+ ' - Rest: '.number_format($up_duration - $poller_db_duration - $poller_lock_duration, 2).
+ ' - Execution: '.number_format($duration, 2), LOGGER_DEBUG);
+ $poller_lock_duration = 0;
if ($duration > 3600) {
logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 1 hour (".round($duration/60, 3).")", LOGGER_DEBUG);
foreach ($r AS $pid) {
if (!posix_kill($pid["pid"], 0)) {
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
- array('pid' => $pid["pid"]));
+ array('pid' => $pid["pid"], 'done' => false));
} else {
// Kill long running processes
// Additionally we are lowering the priority.
dba::update('workerqueue',
array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
- array('pid' => $pid["pid"]));
+ array('pid' => $pid["pid"], 'done' => false));
} else {
logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
}
$listitem = array();
// Adding all processes with no workerqueue entry
- $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done`)");
+ $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS
+ (SELECT id FROM `workerqueue`
+ WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done` AND `pid` != ?)", getmypid());
if ($process = dba::fetch($processes)) {
$listitem[0] = "0:".$process["running"];
}
dba::close($processes);
}
dba::close($entries);
- $processlist = ' ('.implode(', ', $listitem).')';
+
+ $jobs_per_minute = 0;
+
+ $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE");
+ if ($job = dba::fetch($jobs)) {
+ $jobs_per_minute = number_format($job['jobs'] / 10, 0);
+ }
+ dba::close($jobs);
+
+ $processlist = ' - jpm: '.$jobs_per_minute.' ('.implode(', ', $listitem).')';
}
$entries = poller_total_entries();
// Check if we should pass some low priority process
$highest_priority = 0;
$found = false;
- $limit = Config::get('system', 'worker_fetch_limit', 5);
+
+ // The higher the number of parallel workers, the more we prefetch to prevent concurring access
+ $limit = Config::get("system", "worker_queues", 4) * 2;
+ $limit = Config::get('system', 'worker_fetch_limit', $limit);
if (poller_passing_slow($highest_priority)) {
// Are there waiting processes with a higher priority than the currently highest?
// Give slower processes some processing time
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
- ORDER BY `priority`, `created` LIMIT 1",
+ ORDER BY `priority`, `created` LIMIT ".intval($limit),
datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
if ($result) {
$found = (dba::affected_rows() > 0);
* @return string SQL statement
*/
function poller_worker_process() {
- global $poller_db_duration;
+ global $poller_db_duration, $poller_lock_duration;
$stamp = (float)microtime(true);
- $found = find_worker_processes();
+ // There can already be jobs for us in the queue.
+ $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done`", intval(getmypid()));
+ if (dbm::is_result($r)) {
+ $poller_db_duration += (microtime(true) - $stamp);
+ return $r;
+ }
+ $stamp = (float)microtime(true);
+ if (!Lock::set('poller_worker_process')) {
+ return false;
+ }
+ $poller_lock_duration = (microtime(true) - $stamp);
+
+ $stamp = (float)microtime(true);
+ $found = find_worker_processes();
$poller_db_duration += (microtime(true) - $stamp);
+ Lock::remove('poller_worker_process');
+
if ($found) {
$r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d AND NOT `done`", intval(getmypid()));
}
function poller_unclaim_process() {
$mypid = getmypid();
- dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid));
+ dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid, 'done' => false));
}
/**
get_app()->end_process();
Lock::remove('poller_worker');
+ Lock::remove('poller_worker_process');
killme();
}
* @brief This class contain Functions for preventing parallel execution of functions
*/
class Lock {
+ private static $semaphore = array();
+
/**
* @brief Check for memcache and open a connection if configured
*
return $memcache;
}
+ /**
+ * @brief Creates a semaphore key
+ *
+ * @param string $fn_name Name of the lock
+ *
+ * @return ressource the semaphore key
+ */
+ private static function semaphore_key($fn_name) {
+ $temp = get_temppath();
+
+ $file = $temp.'/'.$fn_name.'.sem';
+
+ if (!file_exists($file)) {
+ file_put_contents($file, $function);
+ }
+
+ return ftok($file, 'f');
+ }
+
/**
* @brief Sets a lock for a given name
*
$got_lock = false;
$start = time();
+ if (function_exists('sem_get')) {
+ self::$semaphore[$fn_name] = sem_get(self::semaphore_key($fn_name));
+ if (self::$semaphore[$fn_name]) {
+ return sem_acquire(self::$semaphore[$fn_name], ($timeout == 0));
+ }
+ }
+
$memcache = self::connectMemcache();
if (is_object($memcache)) {
$cachekey = get_app()->get_hostname().";lock:".$fn_name;
* @param string $fn_name Name of the lock
*/
public static function remove($fn_name) {
+ if (function_exists('sem_get') && self::$semaphore[$fn_name]) {
+ sem_release(self::$semaphore[$fn_name]);
+ return;
+ }
+
$memcache = self::connectMemcache();
if (is_object($memcache)) {
$cachekey = get_app()->get_hostname().";lock:".$fn_name;