X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fstompqueuemanager.php;h=f059b42f0095f69f5db827bbf3f5698d128f87ba;hb=69ac99ff949ab0118ff25a62471980ad0ec7a52b;hp=badcd4abb0fee788afc579b9940c85a7452f7da8;hpb=5f9a4ebef493997557ef4470268bed0e5799b6cb;p=quix0rs-gnu-social.git diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index badcd4abb0..f059b42f00 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -1,6 +1,6 @@ . * * @category QueueManager - * @package Laconica - * @author Evan Prodromou - * @author Sarven Capadisli - * @copyright 2009 Control Yourself, Inc. + * @package StatusNet + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 - * @link http://laconi.ca/ + * @link http://status.net/ */ require_once 'Stomp.php'; +class LiberalStomp extends Stomp +{ + function getSocket() + { + return $this->_socket; + } +} + class StompQueueManager { var $server = null; @@ -37,7 +45,6 @@ class StompQueueManager var $password = null; var $base = null; var $con = null; - var $frames = array(); function __construct() { @@ -50,9 +57,12 @@ class StompQueueManager function _connect() { if (empty($this->con)) { - $this->con = new Stomp($this->server); + $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); + $this->con = new LiberalStomp($this->server); - if (!$this->con->connect($this->username, $this->password)) { + if ($this->con->connect($this->username, $this->password)) { + $this->_log(LOG_INFO, "Connected."); + } else { $this->_log(LOG_ERR, 'Failed to connect to queue server'); throw new ServerException('Failed to connect to queue server'); } @@ -72,92 +82,88 @@ class StompQueueManager array ('created' => $notice->created)); if (!$result) { - common_log(LOG_ERR, 'Error sending to '.$transport.' queue'); + common_log(LOG_ERR, 'Error sending to '.$queue.' queue'); return false; } common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' - . $notice->id . ' for ' . $transport); + . $notice->id . ' for ' . $queue); } - function nextItem($queue, $timeout=null) + function service($queue, $handler) { $result = null; $this->_connect(); - $frame = $this->con->readFrame(); + $this->con->setReadTimeout($handler->timeout()); - if ($frame) { - $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created'])); + $this->con->subscribe($this->_queueName($queue)); - // XXX: Now the queue handler receives only the ID of the - // notice, and it has to get it from the DB - // A massive improvement would be avoid DB query by transmitting - // all the notice details via queue server... + while (true) { - $notice = Notice::staticGet($frame->body); + // Wait for something on one of our sockets - if ($notice) { - $this->_saveFrame($notice, $queue, $frame); - } else { - $this->log(LOG_WARNING, 'queue item for notice that does not exist'); - } - } - } + $stompsock = $this->con->getSocket(); - function done($object, $queue) - { - $notice = $object; + $handsocks = $handler->getSockets(); - $this->_connect(); + $socks = array_merge(array($stompsock), $handsocks); - $frame = $this->_getFrame($notice, $queue); + $read = $socks; + $write = array(); + $except = array(); - if (empty($frame)) { - $this->log(LOG_ERR, 'Cannot find frame for notice '.$notice->id.' in queue '.$queue); - } else { - // if the msg has been handled positively, ack it - // and the queue server will remove it from the queue - $this->con->ack($frame); - $this->_clearFrame($notice, $queue); + $ready = stream_select($read, $write, $except, $handler->timeout(), 0); - $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id); + if ($ready === false) { + $this->_log(LOG_ERR, "Error selecting on sockets"); + } else if ($ready > 0) { + if (in_array($stompsock, $read)) { + $this->_handleNotice($queue, $handler); + } + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } } + + $this->con->unsubscribe($this->_queueName($queue)); } - function fail($object, $queue) + function _handleNotice($queue, $handler) { - $notice = $object; - - // STOMP server will requeue it after a while anyways, - // so no need to notify. Just get it out of our little - // array + $frame = $this->con->readFrame(); - $this->_clearFrame($notice, $queue); - } + if (!empty($frame)) { + $notice = Notice::staticGet('id', $frame->body); - function _frameKey($notice, $queue) - { - return ((string)$notice->id) . '-' . $queue; - } + if (empty($notice)) { + $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); + $this->con->ack($frame); + } else { + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + $this->con->ack($frame); + } else { + $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + // FIXME we probably shouldn't have to do + // this kind of queue management ourselves + $this->con->ack($frame); + $this->enqueue($notice, $queue); + } + unset($notice); + } - function _saveFrame($notice, $queue, $frame) - { - $k = $this->_frameKey($notice, $queue); - $this->_frames[$k] = $frame; - return true; + unset($frame); + } } - function _getFrame($notice, $queue) + function _queueName($queue) { - $k = $this->_frameKey($notice, $queue); - return $this->_frames[$k]; + return common_config('queue', 'queue_basename') . $queue; } - function _clearFrame($notice, $queue) + function _log($level, $msg) { - $k = $this->_frameKey($notice, $queue); - unset($this->_frames[$k]); + common_log($level, 'StompQueueManager: '.$msg); } }