+ /**
+ * Send any sockets we're listening on to the IO manager
+ * to wait for input.
+ *
+ * @return array of resources
+ */
+ public function getSockets()
+ {
+ $sockets = array();
+ foreach ($this->cons as $con) {
+ if ($con) {
+ $sockets[] = $con->getSocket();
+ }
+ }
+ return $sockets;
+ }
+
+ /**
+ * Get the Stomp connection object associated with the given socket.
+ * @param resource $socket
+ * @return int index into connections list
+ * @throws Exception
+ */
+ protected function connectionFromSocket($socket)
+ {
+ foreach ($this->cons as $i => $con) {
+ if ($con && $con->getSocket() === $socket) {
+ return $i;
+ }
+ }
+ throw new Exception(__CLASS__ . " asked to read from unrecognized socket");
+ }
+
+ /**
+ * We've got input to handle on our socket!
+ * Read any waiting Stomp frame(s) and process them.
+ *
+ * @param resource $socket
+ * @return boolean ok on success
+ */
+ public function handleInput($socket)
+ {
+ $idx = $this->connectionFromSocket($socket);
+ $con = $this->cons[$idx];
+ $host = $con->getServer();
+ $this->defaultIdx = $idx;
+
+ $ok = true;
+ try {
+ $frames = $con->readFrames();
+ } catch (StompException $e) {
+ $this->_log(LOG_ERR, "Lost connection to $host: " . $e->getMessage());
+ fclose($socket); // ???
+ $this->cons[$idx] = null;
+ $this->transaction[$idx] = null;
+ $this->disconnect[$idx] = time();
+ return false;
+ }
+ foreach ($frames as $frame) {
+ $dest = $frame->headers['destination'];
+ if ($dest == $this->control) {
+ if (!$this->handleControlSignal($frame)) {
+ // We got a control event that requests a shutdown;
+ // close out and stop handling anything else!
+ break;
+ }
+ } else {
+ $ok = $this->handleItem($frame) && $ok;
+ }
+ $this->ack($idx, $frame);
+ $this->commit($idx);
+ $this->begin($idx);
+ }
+ return $ok;
+ }