poller_run_cron();
}
- $refetched = false;
-
$starttime = time();
// We fetch the next queue entry that is about to be executed
while ($r = poller_worker_process()) {
+
+ $refetched = false;
+
foreach ($r AS $entry) {
// Assure that the priority is an integer value
$entry['priority'] = (int)$entry['priority'];
logger('Process execution failed, quitting.', LOGGER_DEBUG);
return;
}
+
+ // If possible we will fetch new jobs for this worker
+ if (!$refetched && Lock::set('poller_worker_process', 0)) {
+ logger('Blubb: a');
+ $refetched = find_worker_processes();
+ Lock::remove('poller_worker_process');
+ }
}
// To avoid the quitting of multiple pollers only one poller at a time will execute the check
logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
return;
}
-
- // If possible we will fetch new jobs for this worker
- if (!$refetched && Lock::set('poller_worker_process', 0)) {
- $refetched = find_worker_processes();
- Lock::remove('poller_worker_process');
- }
-
}
logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
}
if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) {
logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
$args = array("include/poller.php", "no_cron");
- $a = get_app();
- $a->proc_run($args);
+ get_app()->proc_run($args);
}
}
*
* @return boolean Have we found something?
*/
-function find_worker_processes() {
+function find_worker_processes($mypid = 0) {
+
+ if ($mypid == 0) {
+ $mypid = getmypid();
+ }
+
// Check if we should pass some low priority process
$highest_priority = 0;
$found = false;
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
ORDER BY `priority`, `created` LIMIT ".intval($limit),
- datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
+ datetime_convert(), $mypid, NULL_DATE, $highest_priority);
if ($result) {
$found = (dba::affected_rows() > 0);
}
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
ORDER BY `priority`, `created` LIMIT ".intval($limit),
- datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
+ datetime_convert(), $mypid, NULL_DATE, $highest_priority);
if ($result) {
$found = (dba::affected_rows() > 0);
}
// If there is no result (or we shouldn't pass lower processes) we check without priority limit
if (!$found) {
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit),
- datetime_convert(), getmypid(), NULL_DATE);
+ datetime_convert(), $mypid, NULL_DATE);
if ($result) {
$found = (dba::affected_rows() > 0);
}
logger('Call poller', LOGGER_DEBUG);
$args = array("include/poller.php", "no_cron");
- $a = get_app();
- $a->proc_run($args);
+ get_app()->proc_run($args);
return;
}
get_app()->end_process();
- Lock::remove('poller_worker');
- Lock::remove('poller_worker_process');
-
killme();
}