<?php
/**
- * Laconica, the distributed open-source microblogging tool
+ * StatusNet, the distributed open-source microblogging tool
*
* Simple-minded queue manager for storing items in the database
*
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @category QueueManager
- * @package Laconica
- * @author Evan Prodromou <evan@controlyourself.ca>
- * @copyright 2009 Control Yourself, Inc.
+ * @package StatusNet
+ * @author Evan Prodromou <evan@status.net>
+ * @author Brion Vibber <brion@status.net>
+ * @copyright 2009-2010 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
- * @link http://laconi.ca/
+ * @link http://status.net/
*/
class DBQueueManager extends QueueManager
{
- var $qis = array();
-
- function enqueue($object, $queue)
+ /**
+ * Saves an object reference into the queue item table.
+ * @return boolean true on success
+ * @throws ServerException on failure
+ */
+ public function enqueue($object, $queue)
{
- $notice = $object;
-
$qi = new Queue_item();
- $qi->notice_id = $notice->id;
+ $qi->frame = $this->encode($object);
$qi->transport = $queue;
- $qi->created = $notice->created;
+ $qi->created = common_sql_now();
$result = $qi->insert();
if (!$result) {
throw new ServerException('DB error inserting queue item');
}
+ $this->stats('enqueued', $queue);
+
return true;
}
- function service($queue, $handler)
+ /**
+ * Poll every 10 seconds for new events during idle periods.
+ * We'll look in more often when there's data available.
+ *
+ * @return int seconds
+ */
+ public function pollInterval()
{
- while (true) {
- $this->_log(LOG_DEBUG, 'Checking for notices...');
- $timeout = $handler->timeout();
- $notice = $this->_nextItem($queue, $timeout);
- if (empty($notice)) {
- $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
- // Nothing in the queue. Do you
- // have other tasks, like servicing your
- // XMPP connection, to do?
- $handler->idle(QUEUE_HANDLER_MISS_IDLE);
- } else {
- $this->_log(LOG_INFO, 'Got notice '. $notice->id);
- // Yay! Got one!
- if ($handler->handle_notice($notice)) {
- $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
- $this->_done($notice, $queue);
- } else {
- $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
- $this->_fail($notice, $queue);
- }
- // Chance to e.g. service your XMPP connection
- $this->_log(LOG_DEBUG, 'Idling after success.');
- $handler->idle(QUEUE_HANDLER_HIT_IDLE);
- }
- // XXX: when do we give up?
- }
+ return 10;
}
- function _nextItem($queue, $timeout=null)
+ /**
+ * Run a polling cycle during idle processing in the input loop.
+ * @return boolean true if we should poll again for more data immediately
+ */
+ public function poll()
{
- $start = time();
- $result = null;
-
- $sleeptime = 1;
+ //$this->_log(LOG_DEBUG, 'Checking for notices...');
+ $qi = Queue_item::top($this->activeQueues());
+ if (empty($qi)) {
+ //$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+ return false;
+ }
- do {
- $qi = Queue_item::top($queue);
- if (empty($qi)) {
- $this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds.");
- sleep($sleeptime);
- $sleeptime *= 2;
- } else {
- $notice = Notice::staticGet('id', $qi->notice_id);
- if (!empty($notice)) {
- $result = $notice;
+ $queue = $qi->transport;
+ $item = $this->decode($qi->frame);
+
+ if ($item) {
+ $rep = $this->logrep($item);
+ $this->_log(LOG_INFO, "Got $rep for transport $queue");
+
+ $handler = $this->getHandler($queue);
+ if ($handler) {
+ if ($handler->handle($item)) {
+ $this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
+ $this->_done($qi);
} else {
- $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id);
- $qi->delete();
- $qi->free();
- $qi = null;
+ $this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
+ $this->_fail($qi);
}
- $sleeptime = 1;
+ } else {
+ $this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
+ $this->_done($qi);
}
- } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout));
-
- return $result;
+ } else {
+ $this->_log(LOG_INFO, "[$queue] Got empty/deleted item, discarding");
+ $this->_done($qi);
+ }
+ return true;
}
- function _done($object, $queue)
+ /**
+ * Delete our claimed item from the queue after successful processing.
+ *
+ * @param QueueItem $qi
+ */
+ protected function _done($qi)
{
- // XXX: right now, we only handle notices
-
- $notice = $object;
+ $queue = $qi->transport;
- $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
- 'transport' => $queue));
-
- if (empty($qi)) {
- $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
- } else {
- if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
- 'for '.$notice->id.', queue '.$queue);
- }
- $qi->delete();
- $qi->free();
- $qi = null;
+ if (empty($qi->claimed)) {
+ $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item $qi->id from $qi->queue");
}
+ $qi->delete();
- $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
-
- $notice->free();
- $notice = null;
+ $this->stats('handled', $queue);
}
- function _fail($object, $queue)
+ /**
+ * Free our claimed queue item for later reprocessing in case of
+ * temporary failure.
+ *
+ * @param QueueItem $qi
+ */
+ protected function _fail($qi)
{
- // XXX: right now, we only handle notices
-
- $notice = $object;
+ $queue = $qi->transport;
- $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
- 'transport' => $queue));
-
- if (empty($qi)) {
- $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+ if (empty($qi->claimed)) {
+ $this->_log(LOG_WARNING, "[$queue:item $qi->id] Ignoring failure for unclaimed queue item");
} else {
- if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '.
- 'for '.$notice->id.', queue '.$queue);
- } else {
- $orig = clone($qi);
- $qi->claimed = null;
- $qi->update($orig);
- $qi = null;
- }
+ $qi->releaseClaim();
}
- $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
-
- $notice->free();
- $notice = null;
- }
-
- function _log($level, $msg)
- {
- common_log($level, 'DBQueueManager: '.$msg);
+ $this->stats('error', $queue);
}
}