From 262dbeac787ad3aecb28c470484eb3fc8d036d93 Mon Sep 17 00:00:00 2001 From: Evan Prodromou Date: Sun, 26 Apr 2009 12:06:50 -0400 Subject: [PATCH] Some updates for testing Comet --- plugins/Comet/bayeux.class.inc.php | 2 - plugins/Comet/jquery.comet.js | 1814 ++++++++++++++++++++++------ plugins/Comet/updatetimeline.js | 27 + 3 files changed, 1478 insertions(+), 365 deletions(-) diff --git a/plugins/Comet/bayeux.class.inc.php b/plugins/Comet/bayeux.class.inc.php index 602a7b6446..785d3e3935 100644 --- a/plugins/Comet/bayeux.class.inc.php +++ b/plugins/Comet/bayeux.class.inc.php @@ -69,8 +69,6 @@ class Bayeux $oReturn = json_decode($data); - common_debug(print_r($oReturn, true)); - if (is_array($oReturn)) { $oReturn = $oReturn[0]; } diff --git a/plugins/Comet/jquery.comet.js b/plugins/Comet/jquery.comet.js index 2124e882cb..6de437fa8e 100644 --- a/plugins/Comet/jquery.comet.js +++ b/plugins/Comet/jquery.comet.js @@ -1,363 +1,1451 @@ -(function($) -{ - var msgHandshake = - { - version: '1.0', - minimumVersion: '0.9', - channel: '/meta/handshake' - }; - - var oTransport = function() - { - this._bXD = - (($.comet._sUrl.substring(0,4) == 'http') && ($.comet._sUrl.substr(7,location.href.length).replace(/\/.*/, '') != location.host)) - ? - true - :false; - - this.connectionType = (this._bXD) ? 'callback-polling' : 'long-polling'; - - this.startup = function(oReturn) - { - if(this._comet._bConnected) return; - this.tunnelInit(); - }; - - this.tunnelInit = function() - { - var msgConnect = - { - channel: '/meta/connect', - clientId: $.comet.clientId, - id: String($.comet._nNextId++), - connectionType: $.comet._oTransport.connectionType - }; - - this.openTunnel(msgConnect); - }; - - this.openTunnel = function(oMsg) - { - $.comet._bPolling = true; - - this._send($.comet._sUrl, oMsg, function(sReturn) - { - var oReturn = (typeof sReturn != "object") ? (eval('(' + sReturn + ')')) : sReturn; - $.comet._bPolling = false; - $.comet.deliver(oReturn); - $.comet._oTransport.closeTunnel(); - }); - }; - - this.closeTunnel = function() - { - if(!$.comet._bInitialized) return; - - if($.comet._advice) - { - if($.comet._advice.reconnect == 'none') return; - - if($.comet._advice.interval > 0) - { - setTimeout($.comet._oTransport._connect, $.comet._advice.interval); - } - else - { - $.comet._oTransport._connect(); - } - } - else - { - $.comet._oTransport._connect(); - } - }; - - this._connect = function() - { - if(!$.comet._bInitialized) return; - - if($.comet._bPolling) return; - - if($.comet._advice && $.comet._advice.reconnect == 'handshake') - { - $.comet._bConnected = false; - $.comet.init($.comet._sUrl); - } - else if($.comet._bConnected) - { - var msgConnect = - { - //jsonp: 'test', - clientId: $.comet.clientId, - id: String($.comet._nNextId++), - channel: '/meta/connect', - connectionType: $.comet._oTransport.connectionType - }; - $.comet._oTransport.openTunnel(msgConnect); - } - }; - - this._send = function(sUrl, oMsg, fCallback) { - //default callback will check advice, deliver messages, and reconnect - var fCallback = (fCallback) ? fCallback : function(sReturn) - { - var oReturn = (typeof sReturn != "object") ? (eval('(' + sReturn + ')')) : sReturn; - - $.comet.deliver(oReturn); - - if($.comet._advice) - { - if($.comet._advice.reconnect == 'none') - return; - - if($.comet._advice.interval > 0) - { - setTimeout($.comet._oTransport._connect, $.comet._advice.interval); - } - else - { - $.comet._oTransport._connect(); - } - } - else - { - $.comet._oTransport._connect(); - } - }; - - //regular AJAX for same domain calls - if((!this._bXD) && (this.connectionType == 'long-polling')) - { - this._pollRequest = $.ajax({ - url: sUrl, - type: 'post', - beforeSend: function(oXhr) { oXhr.setRequestHeader('Connection', 'Keep-Alive'); }, - data: { message: JSON.stringify(oMsg) }, - success: fCallback - }); - } - else // JSONP callback for cross domain - { - this._pollRequest = $.ajax({ - url: sUrl, - dataType: 'jsonp', - jsonp: 'jsonp', - beforeSend: function(oXhr) { oXhr.setRequestHeader('Connection', 'Keep-Alive'); }, - data: - { - message: JSON.stringify($.extend(oMsg,{connectionType: 'callback-polling' })) - }, - success: fCallback - }); - } - } - }; - - $.comet = new function() - { - this.CONNECTED = 'CONNECTED'; - this.CONNECTING = 'CONNECTING'; - this.DISCONNECTED = 'DISCONNECTED'; - this.DISCONNECTING = 'DISCONNECTING'; - - this._aMessageQueue = []; - this._aSubscriptions = []; - this._aSubscriptionCallbacks = []; - this._bInitialized = false; - this._bConnected = false; - this._nBatch = 0; - this._nNextId = 0; - // just define the transport, do not assign it yet. - this._oTransport = ''; //oTransport; - this._sUrl = ''; - - this.supportedConectionTypes = [ 'long-polling', 'callback-polling' ]; - - this.clientId = ''; - - this._bTrigger = true; // this sends $.event.trigger(channel, data) - - this.init = function(sUrl) - { - this._sUrl = (sUrl) ? sUrl : '/cometd'; - - this._oTransport = new oTransport(); - - this._aMessageQueue = []; - this._aSubscriptions = []; - this._bInitialized = true; - this.startBatch(); - - var oMsg = $.extend(msgHandshake, {id: String(this._nNextId++)}); - - this._oTransport._send(this._sUrl, oMsg, $.comet._finishInit); - }; - - this._finishInit = function(sReturn) - { - var oReturn = (typeof sReturn != "object") ? (eval('(' + sReturn + ')')[0]) : sReturn[0]; - - if(oReturn.advice) - $.comet._advice = oReturn.advice; - - var bSuccess = (oReturn.successful) ? oReturn.successful : false; - // do version check - - if(bSuccess) - { - // pick transport ? - // ...... - - $.comet._oTransport._comet = $.comet; - $.comet._oTransport.version = $.comet.version; - - $.comet.clientId = oReturn.clientId; - $.comet._oTransport.startup(oReturn); - $.comet.endBatch(); - } - }; - - this._sendMessage = function(oMsg) - { - if($.comet._nBatch <= 0) - { - if(oMsg.length > 0) - for(var i in oMsg) - { - oMsg[i].clientId = String($.comet.clientId); - oMsg[i].id = String($.comet._nNextId++); - } - else - { - oMsg.clientId = String($.comet.clientId); - oMsg.id = String($.comet._nNextId++); - } - - $.comet._oTransport._send($.comet._sUrl, oMsg); - } - else - { - $.comet._aMessageQueue.push(oMsg); - } - }; - - - this.startBatch = function() { this._nBatch++ }; - this.endBatch = function() { - if(--this._nBatch <= 0) - { - this._nBatch = 0; - if(this._aMessageQueue.length > 0) - { - this._sendMessage(this._aMessageQueue); - this._aMessageQueue = []; - } - } - }; - - this.subscribe = function(sSubscription, fCallback) - { - // if this topic has not been subscribed to yet, send the message now - if(!this._aSubscriptions[sSubscription]) - { - this._aSubscriptions.push(sSubscription) - - if (fCallback) { - this._aSubscriptionCallbacks[sSubscription] = fCallback; - } - - this._sendMessage({ channel: '/meta/subscribe', subscription: sSubscription }); - } - - //$.event.add(window, sSubscription, fCallback); - }; - - this.unsubscribe = function(sSubscription) { - $.comet._sendMessage({ channel: '/meta/unsubscribe', subscription: sSubscription }); - }; - - this.publish = function(sChannel, oData) - { - $.comet._sendMessage({channel: sChannel, data: oData}); - }; - - this.deliver = function(sReturn) - { - var oReturn = sReturn;//eval(sReturn); - - $(oReturn).each(function() - { - $.comet._deliver(this); - }); - }; - - this.disconnect = function() - { - $($.comet._aSubscriptions).each(function(i) - { - $.comet.unsubscribe($.comet._aSubscriptions[i]); - }); - - $.comet._sendMessage({channel:'/meta/disconnect'}); - - $.comet._bInitialized = false; - } - - this._deliver = function(oMsg,oData) - { - if(oMsg.advice) - { - $.comet._advice = oMsg.advice; - } - - switch(oMsg.channel) - { - case '/meta/connect': - if(oMsg.successful && !$.comet._bConnected) - { - $.comet._bConnected = $.comet._bInitialized; - $.comet.endBatch(); - /* - $.comet._sendMessage(msgConnect); - */ - } - else - {} - //$.comet._bConnected = false; - break; - - // add in subscription handling stuff - case '/meta/subscribe': - if(!oMsg.successful) - { - $.comet._oTransport._cancelConnect(); - return; - } - break; - - case '/meta/unsubscribe': - if(!oMsg.successful) - { - $.comet._oTransport._cancelConnect(); - return; - } - break; - - } - - if(oMsg.data) - { - if($.comet._bTrigger) - { - $.event.trigger(oMsg.channel, [oMsg]); - } - - var cb = $.comet._aSubscriptionCallbacks[oMsg.channel]; - if (cb) { - cb(oMsg); - } - } - }; -}; - -})(jQuery); +/** + * Copyright 2008 Mort Bay Consulting Pty. Ltd. + * Dual licensed under the Apache License 2.0 and the MIT license. + * ---------------------------------------------------------------------------- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http: *www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ---------------------------------------------------------------------------- + * Licensed under the MIT license; + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * ---------------------------------------------------------------------------- + * $Revision$ $Date$ + */ +(function($) +{ + /** + * The constructor for a Comet object. + * There is a default Comet instance already created at the variable $.cometd, + * and hence that can be used to start a comet conversation with a server. + * In the rare case a page needs more than one comet conversation, a new instance can be + * created via: + *
+     * var url2 = ...;
+     * var cometd2 = new $.Cometd();
+     * cometd2.init(url2);
+     * 
+ */ + $.Cometd = function(name) + { + var _name = name || 'default'; + var _logPriorities = { debug: 1, info: 2, warn: 3, error: 4 }; + var _logLevel = 'info'; + var _url; + var _xd = false; + var _transport; + var _status = 'disconnected'; + var _messageId = 0; + var _clientId = null; + var _batch = 0; + var _messageQueue = []; + var _listeners = {}; + var _backoff = 0; + var _backoffIncrement = 1000; + var _maxBackoff = 60000; + var _scheduledSend = null; + var _extensions = []; + var _advice = {}; + var _handshakeProps; + + /** + * Returns the name assigned to this Comet object, or the string 'default' + * if no name has been explicitely passed as parameter to the constructor. + */ + this.getName = function() + { + return _name; + }; + + /** + * Configures the initial comet communication with the comet server. + * @param cometURL the URL of the comet server + */ + this.configure = function(cometURL) + { + _configure(cometURL); + }; + + function _configure(cometURL) + { + _url = cometURL; + _debug('Initializing comet with url: {}', _url); + + // Check immediately if we're cross domain + // If cross domain, the handshake must not send the long polling transport type + var urlParts = /(^https?:)?(\/\/(([^:\/\?#]+)(:(\d+))?))?([^\?#]*)/.exec(cometURL); + if (urlParts[3]) _xd = urlParts[3] != location.host; + + // Temporary setup a transport to send the initial handshake + // The transport may be changed as a result of handshake + if (_xd) + _transport = newCallbackPollingTransport(); + else + _transport = newLongPollingTransport(); + _debug('Initial transport is {}', _transport.getType()); + }; + + /** + * Configures and establishes the comet communication with the comet server + * via a handshake and a subsequent connect. + * @param cometURL the URL of the comet server + * @param handshakeProps an object to be merged with the handshake message + * @see #configure(cometURL) + * @see #handshake(handshakeProps) + */ + this.init = function(cometURL, handshakeProps) + { + _configure(cometURL); + _handshake(handshakeProps); + }; + + /** + * Establishes the comet communication with the comet server + * via a handshake and a subsequent connect. + * @param handshakeProps an object to be merged with the handshake message + */ + this.handshake = function(handshakeProps) + { + _handshake(handshakeProps); + }; + + /** + * Disconnects from the comet server. + * @param disconnectProps an object to be merged with the disconnect message + */ + this.disconnect = function(disconnectProps) + { + var bayeuxMessage = { + channel: '/meta/disconnect' + }; + var message = $.extend({}, disconnectProps, bayeuxMessage); + // Deliver immediately + // The handshake and connect mechanism make use of startBatch(), and in case + // of a failed handshake the disconnect would not be delivered if using _send(). + _setStatus('disconnecting'); + _deliver([message], false); + }; + + /** + * Marks the start of a batch of application messages to be sent to the server + * in a single request, obtaining a single response containing (possibly) many + * application reply messages. + * Messages are held in a queue and not sent until {@link #endBatch()} is called. + * If startBatch() is called multiple times, then an equal number of endBatch() + * calls must be made to close and send the batch of messages. + * @see #endBatch() + */ + this.startBatch = function() + { + _startBatch(); + }; + + /** + * Marks the end of a batch of application messages to be sent to the server + * in a single request. + * @see #startBatch() + */ + this.endBatch = function() + { + _endBatch(true); + }; + + /** + * Subscribes to the given channel, performing the given callback in the given scope + * when a message for the channel arrives. + * @param channel the channel to subscribe to + * @param scope the scope of the callback + * @param callback the callback to call when a message is delivered to the channel + * @param subscribeProps an object to be merged with the subscribe message + * @return the subscription handle to be passed to {@link #unsubscribe(object)} + */ + this.subscribe = function(channel, scope, callback, subscribeProps) + { + var subscription = this.addListener(channel, scope, callback); + + // Send the subscription message after the subscription registration to avoid + // races where the server would deliver a message to the subscribers, but here + // on the client the subscription has not been added yet to the data structures + var bayeuxMessage = { + channel: '/meta/subscribe', + subscription: channel + }; + var message = $.extend({}, subscribeProps, bayeuxMessage); + _send(message); + + return subscription; + }; + + /** + * Unsubscribes the subscription obtained with a call to {@link #subscribe(string, object, function)}. + * @param subscription the subscription to unsubscribe. + */ + this.unsubscribe = function(subscription, unsubscribeProps) + { + // Remove the local listener before sending the message + // This ensures that if the server fails, this client does not get notifications + this.removeListener(subscription); + var bayeuxMessage = { + channel: '/meta/unsubscribe', + subscription: subscription[0] + }; + var message = $.extend({}, unsubscribeProps, bayeuxMessage); + _send(message); + }; + + /** + * Publishes a message on the given channel, containing the given content. + * @param channel the channel to publish the message to + * @param content the content of the message + * @param publishProps an object to be merged with the publish message + */ + this.publish = function(channel, content, publishProps) + { + var bayeuxMessage = { + channel: channel, + data: content + }; + var message = $.extend({}, publishProps, bayeuxMessage); + _send(message); + }; + + /** + * Adds a listener for bayeux messages, performing the given callback in the given scope + * when a message for the given channel arrives. + * @param channel the channel the listener is interested to + * @param scope the scope of the callback + * @param callback the callback to call when a message is delivered to the channel + * @returns the subscription handle to be passed to {@link #removeListener(object)} + * @see #removeListener(object) + */ + this.addListener = function(channel, scope, callback) + { + // The data structure is a map, where each subscription + // holds the callback to be called and its scope. + + // Normalize arguments + if (!callback) + { + callback = scope; + scope = undefined; + } + + var subscription = { + scope: scope, + callback: callback + }; + + var subscriptions = _listeners[channel]; + if (!subscriptions) + { + subscriptions = []; + _listeners[channel] = subscriptions; + } + // Pushing onto an array appends at the end and returns the id associated with the element increased by 1. + // Note that if: + // a.push('a'); var hb=a.push('b'); delete a[hb-1]; var hc=a.push('c'); + // then: + // hc==3, a.join()=='a',,'c', a.length==3 + var subscriptionIndex = subscriptions.push(subscription) - 1; + _debug('Added listener: channel \'{}\', callback \'{}\', index {}', channel, callback.name, subscriptionIndex); + + // The subscription to allow removal of the listener is made of the channel and the index + return [channel, subscriptionIndex]; + }; + + /** + * Removes the subscription obtained with a call to {@link #addListener(string, object, function)}. + * @param subscription the subscription to unsubscribe. + */ + this.removeListener = function(subscription) + { + var subscriptions = _listeners[subscription[0]]; + if (subscriptions) + { + delete subscriptions[subscription[1]]; + _debug('Removed listener: channel \'{}\', index {}', subscription[0], subscription[1]); + } + }; + + /** + * Removes all listeners registered with {@link #addListener(channel, scope, callback)} or + * {@link #subscribe(channel, scope, callback)}. + */ + this.clearListeners = function() + { + _listeners = {}; + }; + + /** + * Returns a string representing the status of the bayeux communication with the comet server. + */ + this.getStatus = function() + { + return _status; + }; + + /** + * Sets the backoff period used to increase the backoff time when retrying an unsuccessful or failed message. + * Default value is 1 second, which means if there is a persistent failure the retries will happen + * after 1 second, then after 2 seconds, then after 3 seconds, etc. So for example with 15 seconds of + * elapsed time, there will be 5 retries (at 1, 3, 6, 10 and 15 seconds elapsed). + * @param period the backoff period to set + * @see #getBackoffIncrement() + */ + this.setBackoffIncrement = function(period) + { + _backoffIncrement = period; + }; + + /** + * Returns the backoff period used to increase the backoff time when retrying an unsuccessful or failed message. + * @see #setBackoffIncrement(period) + */ + this.getBackoffIncrement = function() + { + return _backoffIncrement; + }; + + /** + * Returns the backoff period to wait before retrying an unsuccessful or failed message. + */ + this.getBackoffPeriod = function() + { + return _backoff; + }; + + /** + * Sets the log level for console logging. + * Valid values are the strings 'error', 'warn', 'info' and 'debug', from + * less verbose to more verbose. + * @param level the log level string + */ + this.setLogLevel = function(level) + { + _logLevel = level; + }; + + /** + * Registers an extension whose callbacks are called for every incoming message + * (that comes from the server to this client implementation) and for every + * outgoing message (that originates from this client implementation for the + * server). + * The format of the extension object is the following: + *
+         * {
+         *     incoming: function(message) { ... },
+         *     outgoing: function(message) { ... }
+         * }
+         * Both properties are optional, but if they are present they will be called
+         * respectively for each incoming message and for each outgoing message.
+         * 
+ * @param name the name of the extension + * @param extension the extension to register + * @return true if the extension was registered, false otherwise + * @see #unregisterExtension(name) + */ + this.registerExtension = function(name, extension) + { + var existing = false; + for (var i = 0; i < _extensions.length; ++i) + { + var existingExtension = _extensions[i]; + if (existingExtension.name == name) + { + existing = true; + return false; + } + } + if (!existing) + { + _extensions.push({ + name: name, + extension: extension + }); + _debug('Registered extension \'{}\'', name); + return true; + } + else + { + _info('Could not register extension with name \'{}\': another extension with the same name already exists'); + return false; + } + }; + + /** + * Unregister an extension previously registered with + * {@link #registerExtension(name, extension)}. + * @param name the name of the extension to unregister. + * @return true if the extension was unregistered, false otherwise + */ + this.unregisterExtension = function(name) + { + var unregistered = false; + $.each(_extensions, function(index, extension) + { + if (extension.name == name) + { + _extensions.splice(index, 1); + unregistered = true; + _debug('Unregistered extension \'{}\'', name); + return false; + } + }); + return unregistered; + }; + + /** + * Starts a the batch of messages to be sent in a single request. + * @see _endBatch(deliverMessages) + */ + function _startBatch() + { + ++_batch; + }; + + /** + * Ends the batch of messages to be sent in a single request, + * optionally delivering messages present in the message queue depending + * on the given argument. + * @param deliverMessages whether to deliver the messages in the queue or not + * @see _startBatch() + */ + function _endBatch(deliverMessages) + { + --_batch; + if (_batch < 0) _batch = 0; + if (deliverMessages && _batch == 0 && !_isDisconnected()) + { + var messages = _messageQueue; + _messageQueue = []; + if (messages.length > 0) _deliver(messages, false); + } + }; + + function _nextMessageId() + { + return ++_messageId; + }; + + /** + * Converts the given response into an array of bayeux messages + * @param response the response to convert + * @return an array of bayeux messages obtained by converting the response + */ + function _convertToMessages(response) + { + if (response === undefined) return []; + if (response instanceof Array) return response; + if (response instanceof String || typeof response == 'string') return eval('(' + response + ')'); + if (response instanceof Object) return [response]; + throw 'Conversion Error ' + response + ', typeof ' + (typeof response); + }; + + function _setStatus(newStatus) + { + _debug('{} -> {}', _status, newStatus); + _status = newStatus; + }; + + function _isDisconnected() + { + return _status == 'disconnecting' || _status == 'disconnected'; + }; + + /** + * Sends the initial handshake message + */ + function _handshake(handshakeProps) + { + _debug('Starting handshake'); + _clientId = null; + + // Start a batch. + // This is needed because handshake and connect are async. + // It may happen that the application calls init() then subscribe() + // and the subscribe message is sent before the connect message, if + // the subscribe message is not held until the connect message is sent. + // So here we start a batch to hold temporarly any message until + // the connection is fully established. + _batch = 0; + _startBatch(); + + // Save the original properties provided by the user + // Deep copy to avoid the user to be able to change them later + _handshakeProps = $.extend(true, {}, handshakeProps); + + var bayeuxMessage = { + version: '1.0', + minimumVersion: '0.9', + channel: '/meta/handshake', + supportedConnectionTypes: _xd ? ['callback-polling'] : ['long-polling', 'callback-polling'] + }; + // Do not allow the user to mess with the required properties, + // so merge first the user properties and *then* the bayeux message + var message = $.extend({}, handshakeProps, bayeuxMessage); + + // We started a batch to hold the application messages, + // so here we must bypass it and deliver immediately. + _setStatus('handshaking'); + _deliver([message], false); + }; + + function _findTransport(handshakeResponse) + { + var transportTypes = handshakeResponse.supportedConnectionTypes; + if (_xd) + { + // If we are cross domain, check if the server supports it, that's the only option + if ($.inArray('callback-polling', transportTypes) >= 0) return _transport; + } + else + { + // Check if we can keep long-polling + if ($.inArray('long-polling', transportTypes) >= 0) return _transport; + + // The server does not support long-polling + if ($.inArray('callback-polling', transportTypes) >= 0) return newCallbackPollingTransport(); + } + return null; + }; + + function _delayedHandshake() + { + _setStatus('handshaking'); + _delayedSend(function() + { + _handshake(_handshakeProps); + }); + }; + + function _delayedConnect() + { + _setStatus('connecting'); + _delayedSend(function() + { + _connect(); + }); + }; + + function _delayedSend(operation) + { + _cancelDelayedSend(); + var delay = _backoff; + _debug("Delayed send: backoff {}, interval {}", _backoff, _advice.interval); + if (_advice.interval && _advice.interval > 0) + delay += _advice.interval; + _scheduledSend = _setTimeout(operation, delay); + }; + + function _cancelDelayedSend() + { + if (_scheduledSend !== null) clearTimeout(_scheduledSend); + _scheduledSend = null; + }; + + function _setTimeout(funktion, delay) + { + return setTimeout(function() + { + try + { + funktion(); + } + catch (x) + { + _debug('Exception during scheduled execution of function \'{}\': {}', funktion.name, x); + } + }, delay); + }; + + /** + * Sends the connect message + */ + function _connect() + { + _debug('Starting connect'); + var message = { + channel: '/meta/connect', + connectionType: _transport.getType() + }; + _setStatus('connecting'); + _deliver([message], true); + _setStatus('connected'); + }; + + function _send(message) + { + if (_batch > 0) + _messageQueue.push(message); + else + _deliver([message], false); + }; + + /** + * Delivers the messages to the comet server + * @param messages the array of messages to send + */ + function _deliver(messages, comet) + { + // We must be sure that the messages have a clientId. + // This is not guaranteed since the handshake may take time to return + // (and hence the clientId is not known yet) and the application + // may create other messages. + $.each(messages, function(index, message) + { + message['id'] = _nextMessageId(); + if (_clientId) message['clientId'] = _clientId; + messages[index] = _applyOutgoingExtensions(message); + }); + + var self = this; + var envelope = { + url: _url, + messages: messages, + onSuccess: function(request, response) + { + try + { + _handleSuccess.call(self, request, response, comet); + } + catch (x) + { + _debug('Exception during execution of success callback: {}', x); + } + }, + onFailure: function(request, reason, exception) + { + try + { + _handleFailure.call(self, request, messages, reason, exception, comet); + } + catch (x) + { + _debug('Exception during execution of failure callback: {}', x); + } + } + }; + _debug('Sending request to {}, message(s): {}', envelope.url, JSON.stringify(envelope.messages)); + _transport.send(envelope, comet); + }; + + function _applyIncomingExtensions(message) + { + for (var i = 0; i < _extensions.length; ++i) + { + var extension = _extensions[i]; + var callback = extension.extension.incoming; + if (callback && typeof callback === 'function') + { + _debug('Calling incoming extension \'{}\', callback \'{}\'', extension.name, callback.name); + message = _applyExtension(extension.name, callback, message) || message; + } + } + return message; + }; + + function _applyOutgoingExtensions(message) + { + for (var i = 0; i < _extensions.length; ++i) + { + var extension = _extensions[i]; + var callback = extension.extension.outgoing; + if (callback && typeof callback === 'function') + { + _debug('Calling outgoing extension \'{}\', callback \'{}\'', extension.name, callback.name); + message = _applyExtension(extension.name, callback, message) || message; + } + } + return message; + }; + + function _applyExtension(name, callback, message) + { + try + { + return callback(message); + } + catch (x) + { + _debug('Exception during execution of extension \'{}\': {}', name, x); + return message; + } + }; + + function _handleSuccess(request, response, comet) + { + var messages = _convertToMessages(response); + _debug('Received response {}', JSON.stringify(messages)); + + // Signal the transport it can deliver other queued requests + _transport.complete(request, true, comet); + + for (var i = 0; i < messages.length; ++i) + { + var message = messages[i]; + message = _applyIncomingExtensions(message); + + if (message.advice) _advice = message.advice; + + var channel = message.channel; + switch (channel) + { + case '/meta/handshake': + _handshakeSuccess(message); + break; + case '/meta/connect': + _connectSuccess(message); + break; + case '/meta/disconnect': + _disconnectSuccess(message); + break; + case '/meta/subscribe': + _subscribeSuccess(message); + break; + case '/meta/unsubscribe': + _unsubscribeSuccess(message); + break; + default: + _messageSuccess(message); + break; + } + } + }; + + function _handleFailure(request, messages, reason, exception, comet) + { + var xhr = request.xhr; + _debug('Request failed, status: {}, reason: {}, exception: {}', xhr && xhr.status, reason, exception); + + // Signal the transport it can deliver other queued requests + _transport.complete(request, false, comet); + + for (var i = 0; i < messages.length; ++i) + { + var message = messages[i]; + var channel = message.channel; + switch (channel) + { + case '/meta/handshake': + _handshakeFailure(xhr, message); + break; + case '/meta/connect': + _connectFailure(xhr, message); + break; + case '/meta/disconnect': + _disconnectFailure(xhr, message); + break; + case '/meta/subscribe': + _subscribeFailure(xhr, message); + break; + case '/meta/unsubscribe': + _unsubscribeFailure(xhr, message); + break; + default: + _messageFailure(xhr, message); + break; + } + } + }; + + function _handshakeSuccess(message) + { + if (message.successful) + { + _debug('Handshake successful'); + // Save clientId, figure out transport, then follow the advice to connect + _clientId = message.clientId; + + var newTransport = _findTransport(message); + if (newTransport === null) + { + throw 'Could not agree on transport with server'; + } + else + { + if (_transport.getType() != newTransport.getType()) + { + _debug('Changing transport from {} to {}', _transport.getType(), newTransport.getType()); + _transport = newTransport; + } + } + + // Notify the listeners + // Here the new transport is in place, as well as the clientId, so + // the listener can perform a publish() if it wants, and the listeners + // are notified before the connect below. + _notifyListeners('/meta/handshake', message); + + var action = _advice.reconnect ? _advice.reconnect : 'retry'; + switch (action) + { + case 'retry': + _delayedConnect(); + break; + default: + break; + } + } + else + { + _debug('Handshake unsuccessful'); + + var retry = !_isDisconnected() && _advice.reconnect != 'none'; + if (!retry) _setStatus('disconnected'); + + _notifyListeners('/meta/handshake', message); + _notifyListeners('/meta/unsuccessful', message); + + // Only try again if we haven't been disconnected and + // the advice permits us to retry the handshake + if (retry) + { + _increaseBackoff(); + _debug('Handshake failure, backing off and retrying in {} ms', _backoff); + _delayedHandshake(); + } + } + }; + + function _handshakeFailure(xhr, message) + { + _debug('Handshake failure'); + + // Notify listeners + var failureMessage = { + successful: false, + failure: true, + channel: '/meta/handshake', + request: message, + xhr: xhr, + advice: { + action: 'retry', + interval: _backoff + } + }; + + var retry = !_isDisconnected() && _advice.reconnect != 'none'; + if (!retry) _setStatus('disconnected'); + + _notifyListeners('/meta/handshake', failureMessage); + _notifyListeners('/meta/unsuccessful', failureMessage); + + // Only try again if we haven't been disconnected and the + // advice permits us to try again + if (retry) + { + _increaseBackoff(); + _debug('Handshake failure, backing off and retrying in {} ms', _backoff); + _delayedHandshake(); + } + }; + + function _connectSuccess(message) + { + var action = _isDisconnected() ? 'none' : (_advice.reconnect ? _advice.reconnect : 'retry'); + if (!_isDisconnected()) _setStatus(action == 'retry' ? 'connecting' : 'disconnecting'); + + if (message.successful) + { + _debug('Connect successful'); + + // End the batch and allow held messages from the application + // to go to the server (see _handshake() where we start the batch). + // The batch is ended before notifying the listeners, so that + // listeners can batch other cometd operations + _endBatch(true); + + // Notify the listeners after the status change but before the next connect + _notifyListeners('/meta/connect', message); + + // Connect was successful. + // Normally, the advice will say "reconnect: 'retry', interval: 0" + // and the server will hold the request, so when a response returns + // we immediately call the server again (long polling) + switch (action) + { + case 'retry': + _resetBackoff(); + _delayedConnect(); + break; + default: + _resetBackoff(); + _setStatus('disconnected'); + break; + } + } + else + { + _debug('Connect unsuccessful'); + + // Notify the listeners after the status change but before the next action + _notifyListeners('/meta/connect', message); + _notifyListeners('/meta/unsuccessful', message); + + // Connect was not successful. + // This may happen when the server crashed, the current clientId + // will be invalid, and the server will ask to handshake again + switch (action) + { + case 'retry': + _increaseBackoff(); + _delayedConnect(); + break; + case 'handshake': + // End the batch but do not deliver the messages until we connect successfully + _endBatch(false); + _resetBackoff(); + _delayedHandshake(); + break; + case 'none': + _resetBackoff(); + _setStatus('disconnected'); + break; + } + } + }; + + function _connectFailure(xhr, message) + { + _debug('Connect failure'); + + // Notify listeners + var failureMessage = { + successful: false, + failure: true, + channel: '/meta/connect', + request: message, + xhr: xhr, + advice: { + action: 'retry', + interval: _backoff + } + }; + _notifyListeners('/meta/connect', failureMessage); + _notifyListeners('/meta/unsuccessful', failureMessage); + + if (!_isDisconnected()) + { + var action = _advice.reconnect ? _advice.reconnect : 'retry'; + switch (action) + { + case 'retry': + _increaseBackoff(); + _debug('Connect failure, backing off and retrying in {} ms', _backoff); + _delayedConnect(); + break; + case 'handshake': + _resetBackoff(); + _delayedHandshake(); + break; + case 'none': + _resetBackoff(); + break; + default: + _debug('Unrecognized reconnect value: {}', action); + break; + } + } + }; + + function _disconnectSuccess(message) + { + if (message.successful) + { + _debug('Disconnect successful'); + _disconnect(false); + _notifyListeners('/meta/disconnect', message); + } + else + { + _debug('Disconnect unsuccessful'); + _disconnect(true); + _notifyListeners('/meta/disconnect', message); + _notifyListeners('/meta/usuccessful', message); + } + }; + + function _disconnect(abort) + { + _cancelDelayedSend(); + if (abort) _transport.abort(); + _clientId = null; + _setStatus('disconnected'); + _batch = 0; + _messageQueue = []; + _resetBackoff(); + }; + + function _disconnectFailure(xhr, message) + { + _debug('Disconnect failure'); + _disconnect(true); + + var failureMessage = { + successful: false, + failure: true, + channel: '/meta/disconnect', + request: message, + xhr: xhr, + advice: { + action: 'none', + interval: 0 + } + }; + _notifyListeners('/meta/disconnect', failureMessage); + _notifyListeners('/meta/unsuccessful', failureMessage); + }; + + function _subscribeSuccess(message) + { + if (message.successful) + { + _debug('Subscribe successful'); + _notifyListeners('/meta/subscribe', message); + } + else + { + _debug('Subscribe unsuccessful'); + _notifyListeners('/meta/subscribe', message); + _notifyListeners('/meta/unsuccessful', message); + } + }; + + function _subscribeFailure(xhr, message) + { + _debug('Subscribe failure'); + + var failureMessage = { + successful: false, + failure: true, + channel: '/meta/subscribe', + request: message, + xhr: xhr, + advice: { + action: 'none', + interval: 0 + } + }; + _notifyListeners('/meta/subscribe', failureMessage); + _notifyListeners('/meta/unsuccessful', failureMessage); + }; + + function _unsubscribeSuccess(message) + { + if (message.successful) + { + _debug('Unsubscribe successful'); + _notifyListeners('/meta/unsubscribe', message); + } + else + { + _debug('Unsubscribe unsuccessful'); + _notifyListeners('/meta/unsubscribe', message); + _notifyListeners('/meta/unsuccessful', message); + } + }; + + function _unsubscribeFailure(xhr, message) + { + _debug('Unsubscribe failure'); + + var failureMessage = { + successful: false, + failure: true, + channel: '/meta/unsubscribe', + request: message, + xhr: xhr, + advice: { + action: 'none', + interval: 0 + } + }; + _notifyListeners('/meta/unsubscribe', failureMessage); + _notifyListeners('/meta/unsuccessful', failureMessage); + }; + + function _messageSuccess(message) + { + if (message.successful === undefined) + { + if (message.data) + { + // It is a plain message, and not a bayeux meta message + _notifyListeners(message.channel, message); + } + else + { + _debug('Unknown message {}', JSON.stringify(message)); + } + } + else + { + if (message.successful) + { + _debug('Publish successful'); + _notifyListeners('/meta/publish', message); + } + else + { + _debug('Publish unsuccessful'); + _notifyListeners('/meta/publish', message); + _notifyListeners('/meta/unsuccessful', message); + } + } + }; + + function _messageFailure(xhr, message) + { + _debug('Publish failure'); + + var failureMessage = { + successful: false, + failure: true, + channel: message.channel, + request: message, + xhr: xhr, + advice: { + action: 'none', + interval: 0 + } + }; + _notifyListeners('/meta/publish', failureMessage); + _notifyListeners('/meta/unsuccessful', failureMessage); + }; + + function _notifyListeners(channel, message) + { + // Notify direct listeners + _notify(channel, message); + + // Notify the globbing listeners + var channelParts = channel.split("/"); + var last = channelParts.length - 1; + for (var i = last; i > 0; --i) + { + var channelPart = channelParts.slice(0, i).join('/') + '/*'; + // We don't want to notify /foo/* if the channel is /foo/bar/baz, + // so we stop at the first non recursive globbing + if (i == last) _notify(channelPart, message); + // Add the recursive globber and notify + channelPart += '*'; + _notify(channelPart, message); + } + }; + + function _notify(channel, message) + { + var subscriptions = _listeners[channel]; + if (subscriptions && subscriptions.length > 0) + { + for (var i = 0; i < subscriptions.length; ++i) + { + var subscription = subscriptions[i]; + // Subscriptions may come and go, so the array may have 'holes' + if (subscription) + { + try + { + _debug('Notifying subscription: channel \'{}\', callback \'{}\'', channel, subscription.callback.name); + subscription.callback.call(subscription.scope, message); + } + catch (x) + { + // Ignore exceptions from callbacks + _warn('Exception during execution of callback \'{}\' on channel \'{}\' for message {}, exception: {}', subscription.callback.name, channel, JSON.stringify(message), x); + } + } + } + } + }; + + function _resetBackoff() + { + _backoff = 0; + }; + + function _increaseBackoff() + { + if (_backoff < _maxBackoff) _backoff += _backoffIncrement; + }; + + var _error = this._error = function(text, args) + { + _log('error', _format.apply(this, arguments)); + }; + + var _warn = this._warn = function(text, args) + { + _log('warn', _format.apply(this, arguments)); + }; + + var _info = this._info = function(text, args) + { + _log('info', _format.apply(this, arguments)); + }; + + var _debug = this._debug = function(text, args) + { + _log('debug', _format.apply(this, arguments)); + }; + + function _log(level, text) + { + var priority = _logPriorities[level]; + var configPriority = _logPriorities[_logLevel]; + if (!configPriority) configPriority = _logPriorities['info']; + if (priority >= configPriority) + { + if (window.console) window.console.log(text); + } + }; + + function _format(text) + { + var braces = /\{\}/g; + var result = ''; + var start = 0; + var count = 0; + while (braces.test(text)) + { + result += text.substr(start, braces.lastIndex - start - 2); + var arg = arguments[++count]; + result += arg !== undefined ? arg : '{}'; + start = braces.lastIndex; + } + result += text.substr(start, text.length - start); + return result; + }; + + function newLongPollingTransport() + { + return $.extend({}, new Transport('long-polling'), new LongPollingTransport()); + }; + + function newCallbackPollingTransport() + { + return $.extend({}, new Transport('callback-polling'), new CallbackPollingTransport()); + }; + + /** + * Base object with the common functionality for transports. + * The key responsibility is to allow at most 2 outstanding requests to the server, + * to avoid that requests are sent behind a long poll. + * To achieve this, we have one reserved request for the long poll, and all other + * requests are serialized one after the other. + */ + var Transport = function(type) + { + var _maxRequests = 2; + var _requestIds = 0; + var _cometRequest = null; + var _requests = []; + var _packets = []; + + this.getType = function() + { + return type; + }; + + this.send = function(packet, comet) + { + if (comet) + _cometSend(this, packet); + else + _send(this, packet); + }; + + function _cometSend(self, packet) + { + if (_cometRequest !== null) throw 'Concurrent comet requests not allowed, request ' + _cometRequest.id + ' not yet completed'; + + var requestId = ++_requestIds; + _debug('Beginning comet request {}', requestId); + + var request = {id: requestId}; + _debug('Delivering comet request {}', requestId); + self.deliver(packet, request); + _cometRequest = request; + }; + + function _send(self, packet) + { + var requestId = ++_requestIds; + _debug('Beginning request {}, {} other requests, {} queued requests', requestId, _requests.length, _packets.length); + + var request = {id: requestId}; + // Consider the comet request which should always be present + if (_requests.length < _maxRequests - 1) + { + _debug('Delivering request {}', requestId); + self.deliver(packet, request); + _requests.push(request); + } + else + { + _packets.push([packet, request]); + _debug('Queued request {}, {} queued requests', requestId, _packets.length); + } + }; + + this.complete = function(request, success, comet) + { + if (comet) + _cometComplete(request); + else + _complete(this, request, success); + }; + + function _cometComplete(request) + { + var requestId = request.id; + if (_cometRequest !== request) throw 'Comet request mismatch, completing request ' + requestId; + + // Reset comet request + _cometRequest = null; + _debug('Ended comet request {}', requestId); + }; + + function _complete(self, request, success) + { + var requestId = request.id; + var index = $.inArray(request, _requests); + // The index can be negative the request has been aborted + if (index >= 0) _requests.splice(index, 1); + _debug('Ended request {}, {} other requests, {} queued requests', requestId, _requests.length, _packets.length); + + if (_packets.length > 0) + { + var packet = _packets.shift(); + if (success) + { + _debug('Dequeueing and sending request {}, {} queued requests', packet[1].id, _packets.length); + _send(self, packet[0]); + } + else + { + _debug('Dequeueing and failing request {}, {} queued requests', packet[1].id, _packets.length); + // Keep the semantic of calling response callbacks asynchronously after the request + setTimeout(function() { packet[0].onFailure(packet[1], 'error'); }, 0); + } + } + }; + + this.abort = function() + { + for (var i = 0; i < _requests.length; ++i) + { + var request = _requests[i]; + _debug('Aborting request {}', request.id); + if (request.xhr) request.xhr.abort(); + } + if (_cometRequest) + { + _debug('Aborting comet request {}', _cometRequest.id); + if (_cometRequest.xhr) _cometRequest.xhr.abort(); + } + _cometRequest = null; + _requests = []; + _packets = []; + }; + }; + + var LongPollingTransport = function() + { + this.deliver = function(packet, request) + { + request.xhr = $.ajax({ + url: packet.url, + type: 'POST', + contentType: 'text/json;charset=UTF-8', + beforeSend: function(xhr) + { + xhr.setRequestHeader('Connection', 'Keep-Alive'); + return true; + }, + data: JSON.stringify(packet.messages), + success: function(response) { packet.onSuccess(request, response); }, + error: function(xhr, reason, exception) { packet.onFailure(request, reason, exception); } + }); + }; + }; + + var CallbackPollingTransport = function() + { + var _maxLength = 2000; + this.deliver = function(packet, request) + { + // Microsoft Internet Explorer has a 2083 URL max length + // We must ensure that we stay within that length + var messages = JSON.stringify(packet.messages); + // Encode the messages because all brackets, quotes, commas, colons, etc + // present in the JSON will be URL encoded, taking many more characters + var urlLength = packet.url.length + encodeURI(messages).length; + _debug('URL length: {}', urlLength); + // Let's stay on the safe side and use 2000 instead of 2083 + // also because we did not count few characters among which + // the parameter name 'message' and the parameter 'jsonp', + // which sum up to about 50 chars + if (urlLength > _maxLength) + { + var x = packet.messages.length > 1 ? + 'Too many bayeux messages in the same batch resulting in message too big ' + + '(' + urlLength + ' bytes, max is ' + _maxLength + ') for transport ' + this.getType() : + 'Bayeux message too big (' + urlLength + ' bytes, max is ' + _maxLength + ') ' + + 'for transport ' + this.getType(); + // Keep the semantic of calling response callbacks asynchronously after the request + _setTimeout(function() { packet.onFailure(request, 'error', x); }, 0); + } + else + { + $.ajax({ + url: packet.url, + type: 'GET', + dataType: 'jsonp', + jsonp: 'jsonp', + beforeSend: function(xhr) + { + xhr.setRequestHeader('Connection', 'Keep-Alive'); + return true; + }, + data: + { + // In callback-polling, the content must be sent via the 'message' parameter + message: messages + }, + success: function(response) { packet.onSuccess(request, response); }, + error: function(xhr, reason, exception) { packet.onFailure(request, reason, exception); } + }); + } + }; + }; + }; + + /** + * The JS object that exposes the comet API to applications + */ + $.cometd = new $.Cometd(); // The default instance + +})(jQuery); diff --git a/plugins/Comet/updatetimeline.js b/plugins/Comet/updatetimeline.js index f4da1f47cd..6612b51168 100644 --- a/plugins/Comet/updatetimeline.js +++ b/plugins/Comet/updatetimeline.js @@ -1,3 +1,30 @@ // update the local timeline from a Comet server // +var updater = function() +{ + var _handshook = false; + var _connected = false; + var _cometd; + + return { + init: function() + { + _cometd = $.cometd; // Uses the default Comet object + _cometd.init(_timelineServer); + _cometd.subscribe(_timeline, this, receive); + $(window).unload(leave); + } + } + + function leave() + { + _cometd.disconnect(); + } + + function receive(message) + { + var noticeItem = makeNoticeItem(message.data); + var noticeList = $('ul.notices'); + } +}(); -- 2.39.5