return common_config('xmpp', 'user') . '@' . common_config('xmpp', 'server');
}
+class Sharing_XMPP extends XMPPHP_XMPP
+{
+ function getSocket()
+ {
+ return $this->socket;
+ }
+}
+
/**
* connect the configured Jabber account to the configured server
*
{
static $conn = null;
if (!$conn) {
- $conn = new XMPPHP_XMPP(common_config('xmpp', 'host') ?
+ $conn = new Sharing_XMPP(common_config('xmpp', 'host') ?
common_config('xmpp', 'host') :
common_config('xmpp', 'server'),
common_config('xmpp', 'port'),
require_once 'Stomp.php';
+class LiberalStomp extends Stomp
+{
+ function getSocket()
+ {
+ return $this->_socket;
+ }
+}
+
class StompQueueManager
{
var $server = null;
{
if (empty($this->con)) {
$this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
- $this->con = new Stomp($this->server);
+ $this->con = new LiberalStomp($this->server);
if ($this->con->connect($this->username, $this->password)) {
$this->_log(LOG_INFO, "Connected.");
while (true) {
- $frame = $this->con->readFrame();
+ // Wait for something on one of our sockets
- if (!empty($frame)) {
- $notice = Notice::staticGet('id', $frame->body);
+ $stompsock = $this->con->getSocket();
- if (empty($notice)) {
- $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
- $this->con->ack($frame);
- } else {
- if ($handler->handle_notice($notice)) {
- $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ $handsocks = $handler->getSockets();
+
+ $this->_log(LOG_DEBUG, "Got ".count($handsocks)." sockets from handler.");
+ $this->_log(LOG_DEBUG, print_r($handsocks, true));
+
+ $socks = array_merge(array($stompsock), $handsocks);
+
+ $read = $socks;
+ $write = array();
+ $except = array();
+
+ $this->_log(LOG_DEBUG, "Starting select");
+ $ready = stream_select($read, $write, $except, $handler->timeout(), 0);
+ $this->_log(LOG_DEBUG, "Finished select with value '$ready'");
+
+ if (!$ready || $read[0] !== $stompsock) {
+ $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ } else {
+ $frame = $this->con->readFrame();
+
+ if (!empty($frame)) {
+ $notice = Notice::staticGet('id', $frame->body);
+
+ if (empty($notice)) {
+ $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice in queue '. $queue);
$this->con->ack($frame);
} else {
- $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
- // FIXME we probably shouldn't have to do
- // this kind of queue management ourselves
- $this->con->ack($frame);
- $this->enqueue($notice, $queue);
+ if ($handler->handle_notice($notice)) {
+ $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ $this->con->ack($frame);
+ } else {
+ $this->_log(LOG_WARNING, 'Failed handling notice '. $notice->id .' posted at ' . $frame->headers['created'] . ' in queue '. $queue);
+ // FIXME we probably shouldn't have to do
+ // this kind of queue management ourselves
+ $this->con->ack($frame);
+ $this->enqueue($notice, $queue);
+ }
+ unset($notice);
}
- unset($notice);
- }
- unset($frame);
+ unset($frame);
- $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+ $handler->idle(QUEUE_HANDLER_HIT_IDLE);
- } else {
- $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ } else {
+ $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+ }
}
}