* @category QueueManager
* @package StatusNet
* @author Evan Prodromou <evan@status.net>
- * @copyright 2009 StatusNet, Inc.
+ * @author Brion Vibber <brion@status.net>
+ * @copyright 2009-2010 StatusNet, Inc.
* @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
* @link http://status.net/
*/
class DBQueueManager extends QueueManager
{
- var $qis = array();
-
- function enqueue($object, $queue)
+ /**
+ * Saves an object reference into the queue item table.
+ * @return boolean true on success
+ * @throws ServerException on failure
+ */
+ public function enqueue($object, $queue)
{
- $notice = $object;
-
$qi = new Queue_item();
- $qi->notice_id = $notice->id;
+ $qi->frame = $this->encode($object);
$qi->transport = $queue;
- $qi->created = $notice->created;
+ $qi->created = common_sql_now();
$result = $qi->insert();
- if (!$result) {
+ if ($result === false) {
common_log_db_error($qi, 'INSERT', __FILE__);
throw new ServerException('DB error inserting queue item');
}
+ $this->stats('enqueued', $queue);
+
return true;
}
- function service($queue, $handler)
+ /**
+ * Poll every 10 seconds for new events during idle periods.
+ * We'll look in more often when there's data available.
+ *
+ * @return int seconds
+ */
+ public function pollInterval()
{
- while (true) {
- $this->_log(LOG_DEBUG, 'Checking for notices...');
- $timeout = $handler->timeout();
- $notice = $this->_nextItem($queue, $timeout);
- if (empty($notice)) {
- $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
- // Nothing in the queue. Do you
- // have other tasks, like servicing your
- // XMPP connection, to do?
- $handler->idle(QUEUE_HANDLER_MISS_IDLE);
- } else {
- $this->_log(LOG_INFO, 'Got notice '. $notice->id);
- // Yay! Got one!
- if ($handler->handle_notice($notice)) {
- $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
- $this->_done($notice, $queue);
- } else {
- $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
- $this->_fail($notice, $queue);
- }
- // Chance to e.g. service your XMPP connection
- $this->_log(LOG_DEBUG, 'Idling after success.');
- $handler->idle(QUEUE_HANDLER_HIT_IDLE);
- }
- // XXX: when do we give up?
- }
+ return 10;
}
- function _nextItem($queue, $timeout=null)
+ /**
+ * Run a polling cycle during idle processing in the input loop.
+ * @return boolean true if we should poll again for more data immediately
+ */
+ public function poll()
{
- $start = time();
- $result = null;
+ //$this->_log(LOG_DEBUG, 'Checking for notices...');
+ $qi = Queue_item::top($this->activeQueues());
+ if (!$qi instanceof Queue_item) {
+ //$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+ return false;
+ }
- $sleeptime = 1;
+ try {
+ $item = $this->decode($qi->frame);
+ } catch (Exception $e) {
+ $this->_log(LOG_INFO, "[{$qi->transport}] Discarding: ".$e->getMessage());
+ $this->_done($qi);
+ return true;
+ }
- do {
- $qi = Queue_item::top($queue);
- if (empty($qi)) {
- $this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds.");
- sleep($sleeptime);
- $sleeptime *= 2;
+ $rep = $this->logrep($item);
+ $this->_log(LOG_DEBUG, "Got {$rep} for transport {$qi->transport}");
+
+ $handler = $this->getHandler($qi->transport);
+ if ($handler) {
+ if ($handler->handle($item)) {
+ $this->_log(LOG_INFO, "[{$qi->transport}:$rep] Successfully handled item");
+ $this->_done($qi);
} else {
- $notice = Notice::staticGet('id', $qi->notice_id);
- if (!empty($notice)) {
- $result = $notice;
- } else {
- $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id);
- $qi->delete();
- $qi->free();
- $qi = null;
- }
- $sleeptime = 1;
+ $this->_log(LOG_INFO, "[{$qi->transport}:$rep] Failed to handle item");
+ $this->_fail($qi);
}
- } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout));
+ } else {
+ $this->noHandlerFound($qi, $rep);
+ }
+ return true;
+ }
- return $result;
+ // What to do if no handler was found. For example, the OpportunisticQM
+ // should avoid deleting items just because it can't reach XMPP queues etc.
+ protected function noHandlerFound(Queue_item $qi, $rep=null) {
+ $this->_log(LOG_INFO, "[{$qi->transport}:{$rep}] No handler for queue {$qi->transport}; discarding.");
+ $this->_done($qi);
}
- function _done($object, $queue)
+ /**
+ * Delete our claimed item from the queue after successful processing.
+ *
+ * @param QueueItem $qi
+ */
+ protected function _done(Queue_item $qi)
{
- // XXX: right now, we only handle notices
-
- $notice = $object;
-
- $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
- 'transport' => $queue));
-
- if (empty($qi)) {
- $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
- } else {
- if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
- 'for '.$notice->id.', queue '.$queue);
- }
- $qi->delete();
- $qi->free();
- $qi = null;
+ if (empty($qi->claimed)) {
+ $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item {$qi->id} from {$qi->transport}");
}
+ $qi->delete();
- $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
-
- $notice->free();
- $notice = null;
+ $this->stats('handled', $qi->transport);
}
- function _fail($object, $queue)
+ /**
+ * Free our claimed queue item for later reprocessing in case of
+ * temporary failure.
+ *
+ * @param QueueItem $qi
+ */
+ protected function _fail(Queue_item $qi, $releaseOnly=false)
{
- // XXX: right now, we only handle notices
-
- $notice = $object;
-
- $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
- 'transport' => $queue));
-
- if (empty($qi)) {
- $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+ if (empty($qi->claimed)) {
+ $this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Ignoring failure for unclaimed queue item");
} else {
- if (empty($qi->claimed)) {
- $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '.
- 'for '.$notice->id.', queue '.$queue);
- } else {
- $orig = clone($qi);
- $qi->claimed = null;
- $qi->update($orig);
- $qi = null;
- }
+ $qi->releaseClaim();
}
- $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
-
- $notice->free();
- $notice = null;
- }
-
- function _log($level, $msg)
- {
- common_log($level, 'DBQueueManager: '.$msg);
+ if (!$releaseOnly) {
+ $this->stats('error', $qi->transport);
+ }
}
}