require_once("boot.php");
function poller_run($argv, $argc){
- global $a, $db;
+ global $a, $db, $poller_up_start;
+
+ $poller_up_start = microtime(true);
$a = new App(dirname(__DIR__));
* @return integer Number of non executed entries in the worker queue
*/
function poller_total_entries() {
- $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
+ $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s' AND NOT `done`", dbesc(NULL_DATE));
if (dbm::is_result($s)) {
return $s[0]["total"];
} else {
* @return integer Number of active poller processes
*/
function poller_highest_priority() {
- $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
+ $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' AND NOT `done` ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
if (dbm::is_result($s)) {
return $s[0]["priority"];
} else {
* @return integer Is there a process running with that priority?
*/
function poller_process_with_priority_active($priority) {
- $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
+ $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' AND NOT `done` LIMIT 1",
intval($priority), dbesc(NULL_DATE));
return dbm::is_result($s);
}
* @return boolean "true" if further processing should be stopped
*/
function poller_execute($queue) {
+ global $poller_db_duration;
$a = get_app();
if (function_exists($funcname)) {
poller_exec_function($queue, $funcname, $argv);
- dba::delete('workerqueue', array('id' => $queue["id"]));
+
+ $stamp = (float)microtime(true);
+ dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]));
+ $poller_db_duration = (microtime(true) - $stamp);
} else {
logger("Function ".$funcname." does not exist");
+ dba::delete('workerqueue', array('id' => $queue["id"]));
}
return true;
* @param array $argv Array of values to be passed to the function
*/
function poller_exec_function($queue, $funcname, $argv) {
+ global $poller_up_start, $poller_db_duration;
$a = get_app();
$a->process_id = uniqid("wrk", true);
$a->queue = $queue;
+ $up_duration = number_format(microtime(true) - $poller_up_start, 3);
+
$funcname($argv, $argc);
$a->process_id = $old_process_id;
$duration = number_format(microtime(true) - $stamp, 3);
+ $poller_up_start = microtime(true);
+
+ /** With these values we can analyze how effective the worker is.
+ * The database and rest time should be low since this is the unproductive time.
+ * The execution time is the productive time.
+ * By changing parameters like the maximum number of workers we can check the effectivness.
+ */
+ logger('DB: '.number_format($poller_db_duration, 2).' - Rest: '.number_format($up_duration - $poller_db_duration, 2).' - Execution: '.number_format($duration, 2), LOGGER_DEBUG);
+
if ($duration > 3600) {
logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 1 hour (".round($duration/60, 3).")", LOGGER_DEBUG);
} elseif ($duration > 600) {
*
*/
function poller_kill_stale_workers() {
- $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s'", dbesc(NULL_DATE));
+ $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s' AND NOT `done`", dbesc(NULL_DATE));
if (!dbm::is_result($r)) {
// No processing here needed
$listitem = array();
// Adding all processes with no workerqueue entry
- $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
+ $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done`)");
if ($process = dba::fetch($processes)) {
$listitem[0] = "0:".$process["running"];
}
dba::close($processes);
// Now adding all processes with workerqueue entries
- $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
+ $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` AND NOT `done` GROUP BY `priority`");
while ($entry = dba::fetch($entries)) {
- $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
+ $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` WHERE `priority` = ?", $entry["priority"]);
if ($process = dba::fetch($processes)) {
$listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
}
$r = q("SELECT `priority`
FROM `process`
- INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`");
+ INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`");
// No active processes at all? Fine
if (!dbm::is_result($r)) {
// Check if we should pass some low priority process
$highest_priority = 0;
$found = false;
+ $limit = Config::get('system', 'worker_fetch_limit', 5);
if (poller_passing_slow($highest_priority)) {
// Are there waiting processes with a higher priority than the currently highest?
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
- WHERE `executed` <= ? AND `priority` < ?
- ORDER BY `priority`, `created` LIMIT 5",
+ WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
+ ORDER BY `priority`, `created` LIMIT ".intval($limit),
datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
if ($result) {
$found = (dba::affected_rows() > 0);
if (!$found) {
// Give slower processes some processing time
$result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
- WHERE `executed` <= ? AND `priority` > ?
+ WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
ORDER BY `priority`, `created` LIMIT 1",
datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
if ($result) {
// If there is no result (or we shouldn't pass lower processes) we check without priority limit
if (!$found) {
- $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? ORDER BY `priority`, `created` LIMIT 5",
+ $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit),
datetime_convert(), getmypid(), NULL_DATE);
if ($result) {
$found = (dba::affected_rows() > 0);
* @return string SQL statement
*/
function poller_worker_process() {
+ global $poller_db_duration;
$stamp = (float)microtime(true);
$found = find_worker_processes();
- logger('Duration: '.number_format(microtime(true) - $stamp, 3), LOGGER_DEBUG);
+ $poller_db_duration += (microtime(true) - $stamp);
if ($found) {
$r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d", intval(getmypid()));