X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=lib%2Fstompqueuemanager.php;h=f059b42f0095f69f5db827bbf3f5698d128f87ba;hb=9ef05030fe623f18c64f9d24da1ebff04cf6cb62;hp=e7e1e00dd9743aa82ff4d444306ff39067442b16;hpb=23e6dafff6d82492aa7ab2addc2fae99bd609b57;p=quix0rs-gnu-social.git diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php index e7e1e00dd9..f059b42f00 100644 --- a/lib/stompqueuemanager.php +++ b/lib/stompqueuemanager.php @@ -1,6 +1,6 @@ . * * @category QueueManager - * @package Laconica - * @author Evan Prodromou - * @author Sarven Capadisli - * @copyright 2009 Control Yourself, Inc. + * @package StatusNet + * @author Evan Prodromou + * @author Sarven Capadisli + * @copyright 2009 StatusNet, Inc. * @license http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0 - * @link http://laconi.ca/ + * @link http://status.net/ */ require_once 'Stomp.php'; +class LiberalStomp extends Stomp +{ + function getSocket() + { + return $this->_socket; + } +} + class StompQueueManager { var $server = null; @@ -50,7 +58,7 @@ class StompQueueManager { if (empty($this->con)) { $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'..."); - $this->con = new Stomp($this->server); + $this->con = new LiberalStomp($this->server); if ($this->con->connect($this->username, $this->password)) { $this->_log(LOG_INFO, "Connected."); @@ -94,27 +102,59 @@ class StompQueueManager while (true) { - $frame = $this->con->readFrame(); + // Wait for something on one of our sockets + + $stompsock = $this->con->getSocket(); - if (!empty($frame)) { - $notice = Notice::staticGet('id', $frame->body); + $handsocks = $handler->getSockets(); - if (empty($notice)) { - $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice'); + $socks = array_merge(array($stompsock), $handsocks); + + $read = $socks; + $write = array(); + $except = array(); + + $ready = stream_select($read, $write, $except, $handler->timeout(), 0); + + if ($ready === false) { + $this->_log(LOG_ERR, "Error selecting on sockets"); + } else if ($ready > 0) { + if (in_array($stompsock, $read)) { + $this->_handleNotice($queue, $handler); + } + $handler->idle(QUEUE_HANDLER_HIT_IDLE); + } + } + + $this->con->unsubscribe($this->_queueName($queue)); + } + + function _handleNotice($queue, $handler) + { + $frame = $this->con->readFrame(); + + if (!empty($frame)) { + $notice = Notice::staticGet('id', $frame->body); + + if (empty($notice)) { + $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue); + $this->con->ack($frame); + } else { + if ($handler->handle_notice($notice)) { + $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); $this->con->ack($frame); - } else if ($handler->handle_notice($notice)) { - $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created']); + } else { + $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue); + // FIXME we probably shouldn't have to do + // this kind of queue management ourselves $this->con->ack($frame); - unset($notice); + $this->enqueue($notice, $queue); } - - unset($frame); + unset($notice); } - $handler->idle(0); + unset($frame); } - - $this->con->unsubscribe($this->_queueName($queue)); } function _queueName($queue)