self::$dbo->errorno = mysql_errno(self::$dbo->db);
} else {
self::$dbo->affected_rows = mysql_affected_rows($retval);
+
+ // Due to missing mysql_* support this here wasn't tested at all
+ // See here: http://php.net/manual/en/function.mysql-num-rows.php
+ if (self::$dbo->affected_rows <= 0) {
+ self::$dbo->affected_rows = mysql_num_rows($retval);
+ }
}
break;
}
$sql = "DELETE FROM `".$command['table']."` WHERE `".
implode("` = ? AND `", array_keys($command['param']))."` = ?";
- logger(dba::replace_parameters($sql, $command['param']), LOGGER_DATA);
+ logger(self::replace_parameters($sql, $command['param']), LOGGER_DATA);
if (!self::e($sql, $command['param'])) {
if ($do_transaction) {
$sql = "DELETE FROM `".$table."` WHERE `".$field."` IN (".
substr(str_repeat("?, ", count($field_values)), 0, -2).");";
- logger(dba::replace_parameters($sql, $field_values), LOGGER_DATA);
+ logger(self::replace_parameters($sql, $field_values), LOGGER_DATA);
if (!self::e($sql, $field_values)) {
if ($do_transaction) {
require_once("boot.php");
function poller_run($argv, $argc){
- global $a, $db, $poller_up_start;
+ global $a, $db, $poller_up_start, $poller_db_duration;
$poller_up_start = microtime(true);
// If possible we will fetch new jobs for this worker
if (!$refetched && Lock::set('poller_worker_process', 0)) {
- logger('Blubb: a');
+ $stamp = (float)microtime(true);
$refetched = find_worker_processes();
+ $poller_db_duration += (microtime(true) - $stamp);
Lock::remove('poller_worker_process');
}
}
}
dba::close($entries);
- $jobs_per_minute = 0;
-
- $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL 10 MINUTE");
- if ($job = dba::fetch($jobs)) {
- $jobs_per_minute = number_format($job['jobs'] / 10, 0);
+ $intervals = array(1, 10, 60);
+ $jobs_per_minute = array();
+ foreach ($intervals AS $interval) {
+ $jobs = dba::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ".intval($interval)." MINUTE");
+ if ($job = dba::fetch($jobs)) {
+ $jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0);
+ }
+ dba::close($jobs);
}
- dba::close($jobs);
-
- $processlist = ' - jpm: '.$jobs_per_minute.' ('.implode(', ', $listitem).')';
+ $processlist = ' - jpm: '.implode('/', $jobs_per_minute).' ('.implode(', ', $listitem).')';
}
$entries = poller_total_entries();
if (poller_passing_slow($highest_priority)) {
// Are there waiting processes with a higher priority than the currently highest?
- $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
+ $result = dba::p("SELECT `id` FROM `workerqueue`
WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
ORDER BY `priority`, `created` LIMIT ".intval($limit),
- datetime_convert(), $mypid, NULL_DATE, $highest_priority);
- if ($result) {
- $found = (dba::affected_rows() > 0);
+ NULL_DATE, $highest_priority);
+
+ while ($id = dba::fetch($result)) {
+ $ids[] = $id["id"];
}
+ dba::close($result);
+
+ $found = (count($ids) > 0);
if (!$found) {
// Give slower processes some processing time
- $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
+ $result = dba::p("SELECT `id` FROM `workerqueue`
WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
ORDER BY `priority`, `created` LIMIT ".intval($limit),
- datetime_convert(), $mypid, NULL_DATE, $highest_priority);
- if ($result) {
- $found = (dba::affected_rows() > 0);
+ NULL_DATE, $highest_priority);
+
+ while ($id = dba::fetch($result)) {
+ $ids[] = $id["id"];
}
+ dba::close($result);
+
+ $found = (count($ids) > 0);
}
}
// If there is no result (or we shouldn't pass lower processes) we check without priority limit
if (!$found) {
- $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit),
- datetime_convert(), $mypid, NULL_DATE);
- if ($result) {
- $found = (dba::affected_rows() > 0);
+ $result = dba::p("SELECT `id` FROM `workerqueue` WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), NULL_DATE);
+
+ while ($id = dba::fetch($result)) {
+ $ids[] = $id["id"];
}
+ dba::close($result);
+
+ $found = (count($ids) > 0);
+ }
+
+ if ($found) {
+ $sql = "UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`;";
+ array_unshift($ids, datetime_convert(), $mypid);
+ dba::e($sql, $ids);
}
+
return $found;
}