]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/liberalstomp.php
DB_DataObject recommends using ->tableName()
[quix0rs-gnu-social.git] / lib / liberalstomp.php
1 <?php
2
3 /**
4  * Based on code from Stomp PHP library, working around bugs in the base class.
5  *
6  * Original code is copyright 2005-2006 The Apache Software Foundation
7  * Modifications copyright 2009 StatusNet Inc by Brion Vibber <brion@status.net>
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  */
21
22 class LiberalStomp extends Stomp
23 {
24     /**
25      * We need to be able to get the socket so advanced daemons can
26      * do a select() waiting for input both from the queue and from
27      * other sources such as an XMPP connection.
28      *
29      * @return resource
30      */
31     function getSocket()
32     {
33         return $this->_socket;
34     }
35
36     /**
37      * Return the host we're currently connected to.
38      *
39      * @return string
40      */
41     function getServer()
42     {
43         $idx = $this->_currentHost;
44         if ($idx >= 0) {
45             $host = $this->_hosts[$idx];
46             return "$host[0]:$host[1]";
47         } else {
48             return '[unconnected]';
49         }
50     }
51
52     /**
53      * Make socket connection to the server
54      * We also set the stream to non-blocking mode, since we'll be
55      * select'ing to wait for updates. In blocking mode it seems
56      * to get confused sometimes.
57      *
58      * @throws StompException
59      */
60     protected function _makeConnection ()
61     {
62         parent::_makeConnection();
63         stream_set_blocking($this->_socket, 0);
64     }
65
66     /**
67      * Version 1.0.0 of the Stomp library gets confused if messages
68      * come in too fast over the connection. This version will read
69      * out as many frames as are ready to be read from the socket.
70      *
71      * Modified from Stomp::readFrame()
72      *
73      * @return StompFrame False when no frame to read
74      */
75     public function readFrames ()
76     {
77         if (!$this->hasFrameToRead()) {
78             return false;
79         }
80         
81         $rb = 1024;
82         $data = '';
83         $end = false;
84         $frames = array();
85
86         do {
87             // @fixme this sometimes hangs in blocking mode...
88             // shouldn't we have been idle until we found there's more data?
89             $read = fread($this->_socket, $rb);
90             if ($read === false || ($read === '' && feof($this->_socket))) {
91                 // @fixme possibly attempt an auto reconnect as old code?
92                 throw new StompException("Error reading");
93                 //$this->_reconnect();
94                 // @fixme this will lose prior items
95                 //return $this->readFrames();
96             }
97             $data .= $read;
98             if (strpos($data, "\x00") !== false) {
99                 // Frames are null-delimited, but some servers
100                 // may append an extra \n according to old bug reports.
101                 $data = str_replace("\x00\n", "\x00", $data);
102                 $chunks = explode("\x00", $data);
103
104                 $data = array_pop($chunks);
105                 $frames = array_merge($frames, $chunks);
106                 if ($data == '') {
107                     // We're at the end of a frame; stop reading.
108                     break;
109                 } else {
110                     // In the middle of a frame; keep going.
111                 }
112             }
113             // @fixme find out why this len < 2 check was there
114             //$len = strlen($data);
115         } while (true);//$len < 2 || $end == false);
116
117         return array_map(array($this, 'parseFrame'), $frames);
118     }
119     
120     /**
121      * Parse a raw Stomp frame into an object.
122      * Extracted from Stomp::readFrame()
123      *
124      * @param string $data
125      * @return StompFrame
126      */
127     function parseFrame($data)
128     {
129         list ($header, $body) = explode("\n\n", $data, 2);
130         $header = explode("\n", $header);
131         $headers = array();
132         $command = null;
133         foreach ($header as $v) {
134             if (isset($command)) {
135                 list ($name, $value) = explode(':', $v, 2);
136                 $headers[$name] = $value;
137             } else {
138                 $command = $v;
139             }
140         }
141         $frame = new StompFrame($command, $headers, trim($body));
142         if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
143             require_once 'Stomp/Message/Map.php';
144             return new StompMessageMap($frame);
145         } else {
146             return $frame;
147         }
148         return $frame;
149     }
150
151     /**
152      * Write frame to server
153      *
154      * @param StompFrame $stompFrame
155      */
156     protected function _writeFrame (StompFrame $stompFrame)
157     {
158         if (!is_resource($this->_socket)) {
159             require_once 'Stomp/Exception.php';
160             throw new StompException('Socket connection hasn\'t been established');
161         }
162
163         $data = $stompFrame->__toString();
164
165         // Make sure the socket's in a writable state; if not, wait a bit.
166         stream_set_blocking($this->_socket, 1);
167
168         $r = fwrite($this->_socket, $data, strlen($data));
169         stream_set_blocking($this->_socket, 0);
170         if ($r === false || $r == 0) {
171             $this->_reconnect();
172             $this->_writeFrame($stompFrame);
173         }
174     }
175  }
176