<?php
/**
- * Laconica, the distributed open-source microblogging tool
+ * StatusNet, the distributed open-source microblogging tool
*
* Simple-minded queue manager for storing items in the database
*
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @category QueueManager
- * @package Laconica
- * @author Evan Prodromou <evan@controlyourself.ca>
- * @author Sarven Capadisli <csarven@controlyourself.ca>
- * @copyright 2009 Control Yourself, Inc.
+ * @package StatusNet
+ * @author Evan Prodromou <evan@status.net>
+ * @copyright 2009 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
- * @link http://laconi.ca/
+ * @link http://status.net/
*/
class DBQueueManager extends QueueManager
function enqueue($object, $queue)
{
- $notice = (Notice)$object;
+ $notice = $object;
$qi = new Queue_item();
return true;
}
- function nextItem($queue, $timeout=null)
+ function service($queue, $handler)
+ {
+ while (true) {
+ $this->_log(LOG_DEBUG, 'Checking for notices...');
+ $timeout = $handler->timeout();
+ $notice = $this->_nextItem($queue, $timeout);
+ if (empty($notice)) {
+ $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+ // Nothing in the queue. Do you
+ // have other tasks, like servicing your
+ // XMPP connection, to do?
+ $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ } else {
+ $this->_log(LOG_INFO, 'Got notice '. $notice->id);
+ // Yay! Got one!
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
+ $this->_done($notice, $queue);
+ } else {
+ $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
+ $this->_fail($notice, $queue);
+ }
+ // Chance to e.g. service your XMPP connection
+ $this->_log(LOG_DEBUG, 'Idling after success.');
+ $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+ }
+ // XXX: when do we give up?
+ }
+ }
+
+ function _nextItem($queue, $timeout=null)
{
$start = time();
$result = null;
+ $sleeptime = 1;
+
do {
$qi = Queue_item::top($queue);
- if (!empty($qi)) {
+ if (empty($qi)) {
+ $this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds.");
+ sleep($sleeptime);
+ $sleeptime *= 2;
+ } else {
$notice = Notice::staticGet('id', $qi->notice_id);
if (!empty($notice)) {
$result = $notice;
$qi->free();
$qi = null;
}
+ $sleeptime = 1;
}
} while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout));
return $result;
}
- function done($object, $queue)
+ function _done($object, $queue)
{
- $notice = (Notice)$object;
+ // XXX: right now, we only handle notices
+
+ $notice = $object;
$qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
'transport' => $queue));
if (empty($qi)) {
- $this->log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+ $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
} else {
if (empty($qi->claimed)) {
- $this->log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
- 'for '.$notice->id.', queue '.$queue);
+ $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
+ 'for '.$notice->id.', queue '.$queue);
}
$qi->delete();
$qi->free();
$qi = null;
}
- $this->log(LOG_INFO, 'done with notice ID = ' . $notice->id);
+ $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
+
+ $notice->free();
+ $notice = null;
+ }
+
+ function _fail($object, $queue)
+ {
+ // XXX: right now, we only handle notices
+
+ $notice = $object;
+
+ $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
+ 'transport' => $queue));
+
+ if (empty($qi)) {
+ $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+ } else {
+ if (empty($qi->claimed)) {
+ $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '.
+ 'for '.$notice->id.', queue '.$queue);
+ } else {
+ $orig = clone($qi);
+ $qi->claimed = null;
+ $qi->update($orig);
+ $qi = null;
+ }
+ }
+
+ $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
$notice->free();
$notice = null;