X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fdbqueuemanager.php;h=750300928e435e0d1450b3206dea8cfdc56f9b14;hb=69ac99ff949ab0118ff25a62471980ad0ec7a52b;hp=46be54b30b5bf31dac52a9b75ddc27c66868efc6;hpb=58b427869a001a91d66cff497f1563b8277f1a67;p=quix0rs-gnu-social.git diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index 46be54b30b..750300928e 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -1,6 +1,6 @@ . * * @category QueueManager - * @package Laconica - * @author Evan Prodromou - * @author Sarven Capadisli - * @copyright 2009 Control Yourself, Inc. + * @package StatusNet + * @author Evan Prodromou + * @copyright 2009 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 - * @link http://laconi.ca/ + * @link http://status.net/ */ class DBQueueManager extends QueueManager @@ -51,14 +50,50 @@ class DBQueueManager extends QueueManager return true; } - function nextItem($queue, $timeout=null) + function service($queue, $handler) + { + while (true) { + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $timeout = $handler->timeout(); + $notice = $this->_nextItem($queue, $timeout); + if (empty($notice)) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->_log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_done($notice, $queue); + } else { + $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $this->_fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->_log(LOG_DEBUG, 'Idling after success.'); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? + } + } + + function _nextItem($queue, $timeout=null) { $start = time(); $result = null; + $sleeptime = 1; + do { $qi = Queue_item::top($queue); - if (!empty($qi)) { + if (empty($qi)) { + $this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds."); + sleep($sleeptime); + $sleeptime *= 2; + } else { $notice = Notice::staticGet('id', $qi->notice_id); if (!empty($notice)) { $result = $notice; @@ -68,13 +103,14 @@ class DBQueueManager extends QueueManager $qi->free(); $qi = null; } + $sleeptime = 1; } } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); return $result; } - function done($object, $queue) + function _done($object, $queue) { // XXX: right now, we only handle notices @@ -84,18 +120,47 @@ class DBQueueManager extends QueueManager 'transport' => $queue)); if (empty($qi)) { - $this->log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); } else { if (empty($qi->claimed)) { - $this->log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. - 'for '.$notice->id.', queue '.$queue); + $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); } $qi->delete(); $qi->free(); $qi = null; } - $this->log(LOG_INFO, 'done with notice ID = ' . $notice->id); + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); + + $notice->free(); + $notice = null; + } + + function _fail($object, $queue) + { + // XXX: right now, we only handle notices + + $notice = $object; + + $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id, + 'transport' => $queue)); + + if (empty($qi)) { + $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue); + } else { + if (empty($qi->claimed)) { + $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. + 'for '.$notice->id.', queue '.$queue); + } else { + $orig = clone($qi); + $qi->claimed = null; + $qi->update($orig); + $qi = null; + } + } + + $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id); $notice->free(); $notice = null;