]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - extlib/Stomp.php
A bit more instructive debugging
[quix0rs-gnu-social.git] / extlib / Stomp.php
1 <?php
2 /**
3  *
4  * Copyright 2005-2006 The Apache Software Foundation
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 /* vim: set expandtab tabstop=3 shiftwidth=3: */
20
21 require_once 'Stomp/Frame.php';
22
23 /**
24  * A Stomp Connection
25  *
26  *
27  * @package Stomp
28  * @author Hiram Chirino <hiram@hiramchirino.com>
29  * @author Dejan Bosanac <dejan@nighttale.net> 
30  * @author Michael Caplan <mcaplan@labnet.net>
31  * @version $Revision: 43 $
32  */
33 class Stomp
34 {
35     /**
36      * Perform request synchronously
37      *
38      * @var boolean
39      */
40     public $sync = false;
41
42     /**
43      * Default prefetch size
44      *
45      * @var int
46      */
47         public $prefetchSize = 1;
48     
49         /**
50      * Client id used for durable subscriptions
51      *
52      * @var string
53      */
54         public $clientId = null;
55     
56     protected $_brokerUri = null;
57     protected $_socket = null;
58     protected $_hosts = array();
59     protected $_params = array();
60     protected $_subscriptions = array();
61     protected $_defaultPort = 61613;
62     protected $_currentHost = - 1;
63     protected $_attempts = 10;
64     protected $_username = '';
65     protected $_password = '';
66     protected $_sessionId;
67     protected $_read_timeout_seconds = 60;
68     protected $_read_timeout_milliseconds = 0;
69     protected $_connect_timeout_seconds = 60;
70     
71     /**
72      * Constructor
73      *
74      * @param string $brokerUri Broker URL
75      * @throws StompException
76      */
77     public function __construct ($brokerUri)
78     {
79         $this->_brokerUri = $brokerUri;
80         $this->_init();
81     }
82     /**
83      * Initialize connection
84      *
85      * @throws StompException
86      */
87     protected function _init ()
88     {
89         $pattern = "|^(([a-zA-Z]+)://)+\(*([a-zA-Z0-9\.:/i,-]+)\)*\??([a-zA-Z0-9=]*)$|i";
90         if (preg_match($pattern, $this->_brokerUri, $regs)) {
91             $scheme = $regs[2];
92             $hosts = $regs[3];
93             $params = $regs[4];
94             if ($scheme != "failover") {
95                 $this->_processUrl($this->_brokerUri);
96             } else {
97                 $urls = explode(",", $hosts);
98                 foreach ($urls as $url) {
99                     $this->_processUrl($url);
100                 }
101             }
102             if ($params != null) {
103                 parse_str($params, $this->_params);
104             }
105         } else {
106             require_once 'Stomp/Exception.php';
107             throw new StompException("Bad Broker URL {$this->_brokerUri}");
108         }
109     }
110     /**
111      * Process broker URL
112      *
113      * @param string $url Broker URL
114      * @throws StompException
115      * @return boolean
116      */
117     protected function _processUrl ($url)
118     {
119         $parsed = parse_url($url);
120         if ($parsed) {
121             array_push($this->_hosts, array($parsed['host'] , $parsed['port'] , $parsed['scheme']));
122         } else {
123             require_once 'Stomp/Exception.php';
124             throw new StompException("Bad Broker URL $url");
125         }
126     }
127     /**
128      * Make socket connection to the server
129      *
130      * @throws StompException
131      */
132     protected function _makeConnection ()
133     {
134         if (count($this->_hosts) == 0) {
135             require_once 'Stomp/Exception.php';
136             throw new StompException("No broker defined");
137         }
138         
139         // force disconnect, if previous established connection exists
140         $this->disconnect();
141         
142         $i = $this->_currentHost;
143         $att = 0;
144         $connected = false;
145         $connect_errno = null;
146         $connect_errstr = null;
147         
148         while (! $connected && $att ++ < $this->_attempts) {
149             if (isset($this->_params['randomize']) && $this->_params['randomize'] == 'true') {
150                 $i = rand(0, count($this->_hosts) - 1);
151             } else {
152                 $i = ($i + 1) % count($this->_hosts);
153             }
154             $broker = $this->_hosts[$i];
155             $host = $broker[0];
156             $port = $broker[1];
157             $scheme = $broker[2];
158             if ($port == null) {
159                 $port = $this->_defaultPort;
160             }
161             if ($this->_socket != null) {
162                 fclose($this->_socket);
163                 $this->_socket = null;
164             }
165             $this->_socket = @fsockopen($scheme . '://' . $host, $port, $connect_errno, $connect_errstr, $this->_connect_timeout_seconds);
166             if (!is_resource($this->_socket) && $att >= $this->_attempts && !array_key_exists($i + 1, $this->_hosts)) {
167                 require_once 'Stomp/Exception.php';
168                 throw new StompException("Could not connect to $host:$port ($att/{$this->_attempts})");
169             } else if (is_resource($this->_socket)) {
170                 $connected = true;
171                 $this->_currentHost = $i;
172                 break;
173             }
174         }
175         if (! $connected) {
176             require_once 'Stomp/Exception.php';
177             throw new StompException("Could not connect to a broker");
178         }
179     }
180     /**
181      * Connect to server
182      *
183      * @param string $username
184      * @param string $password
185      * @return boolean
186      * @throws StompException
187      */
188     public function connect ($username = '', $password = '')
189     {
190         $this->_makeConnection();
191         if ($username != '') {
192             $this->_username = $username;
193         }
194         if ($password != '') {
195             $this->_password = $password;
196         }
197                 $headers = array('login' => $this->_username , 'passcode' => $this->_password);
198                 if ($this->clientId != null) {
199                         $headers["client-id"] = $this->clientId;
200                 }
201                 $frame = new StompFrame("CONNECT", $headers);
202         $this->_writeFrame($frame);
203         $frame = $this->readFrame();
204         if ($frame instanceof StompFrame && $frame->command == 'CONNECTED') {
205             $this->_sessionId = $frame->headers["session"];
206             return true;
207         } else {
208             require_once 'Stomp/Exception.php';
209             if ($frame instanceof StompFrame) {
210                 throw new StompException("Unexpected command: {$frame->command}", 0, $frame->body);
211             } else {
212                 throw new StompException("Connection not acknowledged");
213             }
214         }
215     }
216     
217     /**
218      * Check if client session has ben established
219      *
220      * @return boolean
221      */
222     public function isConnected ()
223     {
224         return !empty($this->_sessionId) && is_resource($this->_socket);
225     }
226     /**
227      * Current stomp session ID
228      *
229      * @return string
230      */
231     public function getSessionId()
232     {
233         return $this->_sessionId;
234     }
235     /**
236      * Send a message to a destination in the messaging system 
237      *
238      * @param string $destination Destination queue
239      * @param string|StompFrame $msg Message
240      * @param array $properties
241      * @param boolean $sync Perform request synchronously
242      * @return boolean
243      */
244     public function send ($destination, $msg, $properties = array(), $sync = null)
245     {
246         if ($msg instanceof StompFrame) {
247             $msg->headers['destination'] = $destination;
248             if (is_array($properties)) $msg->headers = array_merge($msg->headers, $properties);
249             $frame = $msg;
250         } else {
251             $headers = $properties;
252             $headers['destination'] = $destination;
253             $frame = new StompFrame('SEND', $headers, $msg);
254         }
255         $this->_prepareReceipt($frame, $sync);
256         $this->_writeFrame($frame);
257         return $this->_waitForReceipt($frame, $sync);
258     }
259     /**
260      * Prepair frame receipt
261      *
262      * @param StompFrame $frame
263      * @param boolean $sync
264      */
265     protected function _prepareReceipt (StompFrame $frame, $sync)
266     {
267         $receive = $this->sync;
268         if ($sync !== null) {
269             $receive = $sync;
270         }
271         if ($receive == true) {
272             $frame->headers['receipt'] = md5(microtime());
273         }
274     }
275     /**
276      * Wait for receipt
277      *
278      * @param StompFrame $frame
279      * @param boolean $sync
280      * @return boolean
281      * @throws StompException
282      */
283     protected function _waitForReceipt (StompFrame $frame, $sync)
284     {
285
286         $receive = $this->sync;
287         if ($sync !== null) {
288             $receive = $sync;
289         }
290         if ($receive == true) {
291             $id = (isset($frame->headers['receipt'])) ? $frame->headers['receipt'] : null;
292             if ($id == null) {
293                 return true;
294             }
295             $frame = $this->readFrame();
296             if ($frame instanceof StompFrame && $frame->command == 'RECEIPT') {
297                 if ($frame->headers['receipt-id'] == $id) {
298                     return true;
299                 } else {
300                     require_once 'Stomp/Exception.php';
301                     throw new StompException("Unexpected receipt id {$frame->headers['receipt-id']}", 0, $frame->body);
302                 }
303             } else {
304                 require_once 'Stomp/Exception.php';
305                 if ($frame instanceof StompFrame) {
306                     throw new StompException("Unexpected command {$frame->command}", 0, $frame->body);
307                 } else {
308                     throw new StompException("Receipt not received");
309                 }
310             }
311         }
312         return true;
313     }
314     /**
315      * Register to listen to a given destination
316      *
317      * @param string $destination Destination queue
318      * @param array $properties
319      * @param boolean $sync Perform request synchronously
320      * @return boolean
321      * @throws StompException
322      */
323     public function subscribe ($destination, $properties = null, $sync = null)
324     {
325         $headers = array('ack' => 'client');
326                 $headers['activemq.prefetchSize'] = $this->prefetchSize;
327                 if ($this->clientId != null) {
328                         $headers["activemq.subcriptionName"] = $this->clientId;
329                 }
330         if (isset($properties)) {
331             foreach ($properties as $name => $value) {
332                 $headers[$name] = $value;
333             }
334         }
335         $headers['destination'] = $destination;
336         $frame = new StompFrame('SUBSCRIBE', $headers);
337         $this->_prepareReceipt($frame, $sync);
338         $this->_writeFrame($frame);
339         if ($this->_waitForReceipt($frame, $sync) == true) {
340             $this->_subscriptions[$destination] = $properties;
341             return true;
342         } else {
343             return false;
344         }
345     }
346     /**
347      * Remove an existing subscription
348      *
349      * @param string $destination
350      * @param array $properties
351      * @param boolean $sync Perform request synchronously
352      * @return boolean
353      * @throws StompException
354      */
355     public function unsubscribe ($destination, $properties = null, $sync = null)
356     {
357         $headers = array();
358         if (isset($properties)) {
359             foreach ($properties as $name => $value) {
360                 $headers[$name] = $value;
361             }
362         }
363         $headers['destination'] = $destination;
364         $frame = new StompFrame('UNSUBSCRIBE', $headers);
365         $this->_prepareReceipt($frame, $sync);
366         $this->_writeFrame($frame);
367         if ($this->_waitForReceipt($frame, $sync) == true) {
368             unset($this->_subscriptions[$destination]);
369             return true;
370         } else {
371             return false;
372         }
373     }
374     /**
375      * Start a transaction
376      *
377      * @param string $transactionId
378      * @param boolean $sync Perform request synchronously
379      * @return boolean
380      * @throws StompException
381      */
382     public function begin ($transactionId = null, $sync = null)
383     {
384         $headers = array();
385         if (isset($transactionId)) {
386             $headers['transaction'] = $transactionId;
387         }
388         $frame = new StompFrame('BEGIN', $headers);
389         $this->_prepareReceipt($frame, $sync);
390         $this->_writeFrame($frame);
391         return $this->_waitForReceipt($frame, $sync);
392     }
393     /**
394      * Commit a transaction in progress
395      *
396      * @param string $transactionId
397      * @param boolean $sync Perform request synchronously
398      * @return boolean
399      * @throws StompException
400      */
401     public function commit ($transactionId = null, $sync = null)
402     {
403         $headers = array();
404         if (isset($transactionId)) {
405             $headers['transaction'] = $transactionId;
406         }
407         $frame = new StompFrame('COMMIT', $headers);
408         $this->_prepareReceipt($frame, $sync);
409         $this->_writeFrame($frame);
410         return $this->_waitForReceipt($frame, $sync);
411     }
412     /**
413      * Roll back a transaction in progress
414      *
415      * @param string $transactionId
416      * @param boolean $sync Perform request synchronously
417      */
418     public function abort ($transactionId = null, $sync = null)
419     {
420         $headers = array();
421         if (isset($transactionId)) {
422             $headers['transaction'] = $transactionId;
423         }
424         $frame = new StompFrame('ABORT', $headers);
425         $this->_prepareReceipt($frame, $sync);
426         $this->_writeFrame($frame);
427         return $this->_waitForReceipt($frame, $sync);
428     }
429     /**
430      * Acknowledge consumption of a message from a subscription
431          * Note: This operation is always asynchronous
432      *
433      * @param string|StompFrame $messageMessage ID
434      * @param string $transactionId
435      * @return boolean
436      * @throws StompException
437      */
438     public function ack ($message, $transactionId = null)
439     {
440         if ($message instanceof StompFrame) {
441             $headers = $message->headers;
442             if (isset($transactionId)) {
443                 $headers['transaction'] = $transactionId;
444             }                   
445             $frame = new StompFrame('ACK', $headers);
446             $this->_writeFrame($frame);
447             return true;
448         } else {
449             $headers = array();
450             if (isset($transactionId)) {
451                 $headers['transaction'] = $transactionId;
452             }
453             $headers['message-id'] = $message;
454             $frame = new StompFrame('ACK', $headers);
455             $this->_writeFrame($frame);
456             return true;
457         }
458     }
459     /**
460      * Graceful disconnect from the server
461      *
462      */
463     public function disconnect ()
464     {
465                 $headers = array();
466
467                 if ($this->clientId != null) {
468                         $headers["client-id"] = $this->clientId;
469                 }
470
471         if (is_resource($this->_socket)) {
472             $this->_writeFrame(new StompFrame('DISCONNECT', $headers));
473             fclose($this->_socket);
474         }
475         $this->_socket = null;
476         $this->_sessionId = null;
477         $this->_currentHost = -1;
478         $this->_subscriptions = array();
479         $this->_username = '';
480         $this->_password = '';
481     }
482     /**
483      * Write frame to server
484      *
485      * @param StompFrame $stompFrame
486      */
487     protected function _writeFrame (StompFrame $stompFrame)
488     {
489         if (!is_resource($this->_socket)) {
490             require_once 'Stomp/Exception.php';
491             throw new StompException('Socket connection hasn\'t been established');
492         }
493
494         $data = $stompFrame->__toString();
495         $r = fwrite($this->_socket, $data, strlen($data));
496         if ($r === false || $r == 0) {
497             $this->_reconnect();
498             $this->_writeFrame($stompFrame);
499         }
500     }
501     
502     /**
503      * Set timeout to wait for content to read
504      *
505      * @param int $seconds_to_wait  Seconds to wait for a frame
506      * @param int $milliseconds Milliseconds to wait for a frame
507      */
508     public function setReadTimeout($seconds, $milliseconds = 0) 
509     {
510         $this->_read_timeout_seconds = $seconds;
511         $this->_read_timeout_milliseconds = $milliseconds;
512     }
513     
514     /**
515      * Read response frame from server
516      *
517      * @return StompFrame False when no frame to read
518      */
519     public function readFrame ()
520     {
521         if (!$this->hasFrameToRead()) {
522             return false;
523         }
524         
525         $rb = 1024;
526         $data = '';
527         $end = false;
528         
529         do {
530             $read = fread($this->_socket, $rb);
531             if ($read === false) {
532                 $this->_reconnect();
533                 return $this->readFrame();
534             }
535             $data .= $read;
536             if (strpos($data, "\x00") !== false) {
537                 $end = true;
538                 $data = rtrim($data, "\n");
539             }
540             $len = strlen($data);
541         } while ($len < 2 || $end == false);
542         
543         list ($header, $body) = explode("\n\n", $data, 2);
544         $header = explode("\n", $header);
545         $headers = array();
546         $command = null;
547         foreach ($header as $v) {
548             if (isset($command)) {
549                 list ($name, $value) = explode(':', $v, 2);
550                 $headers[$name] = $value;
551             } else {
552                 $command = $v;
553             }
554         }
555         $frame = new StompFrame($command, $headers, trim($body));
556         if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') {
557             require_once 'Stomp/Message/Map.php';
558             return new StompMessageMap($frame);
559         } else {
560             return $frame;
561         }
562         return $frame;
563     }
564     
565     /**
566      * Check if there is a frame to read
567      *
568      * @return boolean
569      */
570     public function hasFrameToRead()
571     {
572         $read = array($this->_socket);
573         $write = null;
574         $except = null;
575         
576         $has_frame_to_read = @stream_select($read, $write, $except, $this->_read_timeout_seconds, $this->_read_timeout_milliseconds);
577         
578         if ($has_frame_to_read !== false)
579             $has_frame_to_read = count($read);
580
581
582         if ($has_frame_to_read === false) {
583             throw new StompException('Check failed to determine if the socket is readable');
584         } else if ($has_frame_to_read > 0) {
585             return true;
586         } else {
587             return false; 
588         }
589     }
590     
591     /**
592      * Reconnects and renews subscriptions (if there were any)
593      * Call this method when you detect connection problems     
594      */
595     protected function _reconnect ()
596     {
597         $subscriptions = $this->_subscriptions;
598         
599         $this->connect($this->_username, $this->_password);
600         foreach ($subscriptions as $dest => $properties) {
601             $this->subscribe($dest, $properties);
602         }
603     }
604     /**
605      * Graceful object desruction
606      *
607      */
608     public function __destruct()
609     {
610         $this->disconnect();
611     }
612 }
613 ?>