+ /**
+ * Send any sockets we're listening on to the IO manager
+ * to wait for input.
+ *
+ * @return array of resources
+ */
+ public function getSockets()
+ {
+ $sockets = array();
+ foreach ($this->cons as $con) {
+ if ($con) {
+ $sockets[] = $con->getSocket();
+ }
+ }
+ return $sockets;
+ }
+
+ /**
+ * Get the Stomp connection object associated with the given socket.
+ * @param resource $socket
+ * @return int index into connections list
+ * @throws Exception
+ */
+ protected function connectionFromSocket($socket)
+ {
+ foreach ($this->cons as $i => $con) {
+ if ($con && $con->getSocket() === $socket) {
+ return $i;
+ }
+ }
+ throw new Exception(__CLASS__ . " asked to read from unrecognized socket");
+ }
+
+ /**
+ * We've got input to handle on our socket!
+ * Read any waiting Stomp frame(s) and process them.
+ *
+ * @param resource $socket
+ * @return boolean ok on success
+ */
+ public function handleInput($socket)
+ {
+ $idx = $this->connectionFromSocket($socket);
+ $con = $this->cons[$idx];
+ $host = $con->getServer();
+
+ $ok = true;
+ try {
+ $frames = $con->readFrames();
+ } catch (StompException $e) {
+ common_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage());
+ $this->cons[$idx] = null;
+ $this->transaction[$idx] = null;
+ $this->disconnect[$idx] = time();
+ return false;
+ }
+ foreach ($frames as $frame) {
+ $dest = $frame->headers['destination'];
+ if ($dest == $this->control) {
+ if (!$this->handleControlSignal($idx, $frame)) {
+ // We got a control event that requests a shutdown;
+ // close out and stop handling anything else!
+ break;
+ }
+ } else {
+ $ok = $ok && $this->handleItem($idx, $frame);
+ }
+ }
+ return $ok;
+ }
+
+ /**
+ * Attempt to reconnect in background if we lost a connection.
+ */
+ function idle()
+ {
+ $now = time();
+ foreach ($this->cons as $idx => $con) {
+ if (empty($con)) {
+ $age = $now - $this->disconnect[$idx];
+ if ($age >= 60) {
+ $this->_reconnect($idx);
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Initialize our connection and subscribe to all the queues
+ * we're going to need to handle... If multiple queue servers
+ * are configured for failover, we'll listen to all of them.
+ *
+ * Side effects: in multi-site mode, may reset site configuration.
+ *
+ * @param IoMaster $master process/event controller
+ * @return bool return false on failure
+ */
+ public function start($master)
+ {
+ parent::start($master);
+ $this->_connectAll();
+
+ common_log(LOG_INFO, "Subscribing to $this->control");
+ foreach ($this->cons as $con) {
+ if ($con) {
+ $con->subscribe($this->control);
+ }
+ }
+ if ($this->sites) {
+ foreach ($this->sites as $server) {
+ StatusNet::init($server);
+ $this->doSubscribe();
+ }
+ } else {
+ $this->doSubscribe();
+ }
+ foreach ($this->cons as $i => $con) {
+ if ($con) {
+ $this->begin($i);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Subscribe to all the queues we're going to need to handle...
+ *
+ * Side effects: in multi-site mode, may reset site configuration.
+ *
+ * @return bool return false on failure
+ */
+ public function finish()
+ {
+ // If there are any outstanding delivered messages we haven't processed,
+ // free them for another thread to take.
+ foreach ($this->cons as $i => $con) {
+ if ($con) {
+ $this->rollback($i);
+ $con->unsubscribe($this->control);
+ }
+ }
+ if ($this->sites) {
+ foreach ($this->sites as $server) {
+ StatusNet::init($server);
+ $this->doUnsubscribe();
+ }
+ } else {
+ $this->doUnsubscribe();
+ }
+ return true;
+ }
+
+ /**
+ * Get identifier of the currently active site configuration
+ * @return string
+ */
+ protected function currentSite()
+ {
+ return common_config('site', 'server'); // @fixme switch to nickname
+ }
+
+ /**
+ * Lazy open a single connection to Stomp queue server.
+ * If multiple servers are configured, we let the Stomp client library
+ * worry about finding a working connection among them.
+ */
+ protected function _connect()
+ {
+ if (empty($this->cons)) {
+ $list = $this->servers;
+ if (count($list) > 1) {
+ shuffle($list); // Randomize to spread load
+ $url = 'failover://(' . implode(',', $list) . ')';
+ } else {
+ $url = $list[0];
+ }
+ $con = $this->_doConnect($url);
+ $this->cons = array($con);
+ $this->transactionCount = array(0);
+ $this->transaction = array(null);
+ $this->disconnect = array(null);
+ }
+ }
+
+ /**
+ * Lazy open connections to all Stomp servers, if in manual failover
+ * mode. This means the queue servers don't speak to each other, so
+ * we have to listen to all of them to make sure we get all events.
+ */
+ protected function _connectAll()
+ {
+ if (!common_config('queue', 'stomp_manual_failover')) {
+ return $this->_connect();
+ }
+ if (empty($this->cons)) {
+ $this->cons = array();
+ $this->transactionCount = array();
+ $this->transaction = array();
+ foreach ($this->servers as $idx => $server) {
+ try {
+ $this->cons[] = $this->_doConnect($server);
+ $this->disconnect[] = null;
+ } catch (Exception $e) {
+ // s'okay, we'll live
+ $this->cons[] = null;
+ $this->disconnect[] = time();
+ }
+ $this->transactionCount[] = 0;
+ $this->transaction[] = null;
+ }
+ if (empty($this->cons)) {
+ throw new ServerException("No queue servers reachable...");
+ return false;
+ }
+ }
+ }