X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fdbqueuemanager.php;h=750300928e435e0d1450b3206dea8cfdc56f9b14;hb=acaf07f6e8c873e0069e84dac74bac3c7da98a97;hp=c9e5ef243f9a2b8beb15d5b57bff60f0dbbe1a50;hpb=741eb1a28bbe505364baa440539bbc5ed6fbe5e0;p=quix0rs-gnu-social.git diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php index c9e5ef243f..750300928e 100644 --- a/lib/dbqueuemanager.php +++ b/lib/dbqueuemanager.php @@ -1,6 +1,6 @@ . * * @category QueueManager - * @package Laconica - * @author Evan Prodromou - * @author Sarven Capadisli - * @copyright 2009 Control Yourself, Inc. + * @package StatusNet + * @author Evan Prodromou + * @copyright 2009 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 - * @link http://laconi.ca/ + * @link http://status.net/ */ class DBQueueManager extends QueueManager @@ -51,14 +50,50 @@ class DBQueueManager extends QueueManager return true; } - function nextItem($queue, $timeout=null) + function service($queue, $handler) + { + while (true) { + $this->_log(LOG_DEBUG, 'Checking for notices...'); + $timeout = $handler->timeout(); + $notice = $this->_nextItem($queue, $timeout); + if (empty($notice)) { + $this->_log(LOG_DEBUG, 'No notices waiting; idling.'); + // Nothing in the queue. Do you + // have other tasks, like servicing your + // XMPP connection, to do? + $handler->idle(QUEUE_HANDLER_MISS_IDLE); + } else { + $this->_log(LOG_INFO, 'Got notice '. $notice->id); + // Yay! Got one! + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id); + $this->_done($notice, $queue); + } else { + $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id); + $this->_fail($notice, $queue); + } + // Chance to e.g. service your XMPP connection + $this->_log(LOG_DEBUG, 'Idling after success.'); + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + // XXX: when do we give up? + } + } + + function _nextItem($queue, $timeout=null) { $start = time(); $result = null; + $sleeptime = 1; + do { $qi = Queue_item::top($queue); - if (!empty($qi)) { + if (empty($qi)) { + $this->_log(LOG_DEBUG, "No new queue items, sleeping $sleeptime seconds."); + sleep($sleeptime); + $sleeptime *= 2; + } else { $notice = Notice::staticGet('id', $qi->notice_id); if (!empty($notice)) { $result = $notice; @@ -68,13 +103,14 @@ class DBQueueManager extends QueueManager $qi->free(); $qi = null; } + $sleeptime = 1; } } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout)); return $result; } - function done($object, $queue) + function _done($object, $queue) { // XXX: right now, we only handle notices @@ -88,7 +124,7 @@ class DBQueueManager extends QueueManager } else { if (empty($qi->claimed)) { $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '. - 'for '.$notice->id.', queue '.$queue); + 'for '.$notice->id.', queue '.$queue); } $qi->delete(); $qi->free(); @@ -101,7 +137,7 @@ class DBQueueManager extends QueueManager $notice = null; } - function fail($object, $queue) + function _fail($object, $queue) { // XXX: right now, we only handle notices @@ -115,7 +151,7 @@ class DBQueueManager extends QueueManager } else { if (empty($qi->claimed)) { $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '. - 'for '.$notice->id.', queue '.$queue); + 'for '.$notice->id.', queue '.$queue); } else { $orig = clone($qi); $qi->claimed = null;