]> git.mxchange.org Git - simgear.git/commitdiff
Expose more pipelining controls on HTTP code
authorJames Turner <zakalawe@mac.com>
Tue, 1 Mar 2016 12:44:22 +0000 (12:44 +0000)
committerJames Turner <zakalawe@mac.com>
Tue, 1 Mar 2016 12:44:22 +0000 (12:44 +0000)
- used for both implementations, restrict default pipeline depth to
  5 instead of 32 which was perhaps a little ambitious for some
  servers.

simgear/io/HTTPClient.cxx
simgear/io/HTTPClient.hxx
simgear/io/HTTPContentDecode.cxx
simgear/io/HTTPContentDecode.hxx

index 6ec17c49b8338c29c1230d8d3fd578defc3ed77f..39d0367966cf265347ab23cb44550ad37748eada 100644 (file)
@@ -68,7 +68,6 @@ namespace HTTP
 
 extern const int DEFAULT_HTTP_PORT = 80;
 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
-const unsigned int MAX_INFLIGHT_REQUESTS = 4;
 
 class Connection;
 typedef std::multimap<std::string, Connection*> ConnectionDict;
@@ -89,7 +88,10 @@ public:
         curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
         curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
         curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
-                          (long) MAX_INFLIGHT_REQUESTS);
+                          (long) maxPipelineDepth);
+        curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
+                          (long) maxHostConnections);
+
 
     }
 #else
@@ -103,11 +105,11 @@ public:
     int proxyPort;
     std::string proxyAuth;
     unsigned int maxConnections;
+    unsigned int maxHostConnections;
+    unsigned int maxPipelineDepth;
 
     RequestList pendingRequests;
 
-
-
     SGTimeStamp timeTransferSample;
     unsigned int bytesTransferred;
     unsigned int lastTransferRate;
@@ -122,7 +124,8 @@ public:
         client(pr),
         state(STATE_CLOSED),
         port(DEFAULT_HTTP_PORT),
-        connectionId(conId)
+        _connectionId(conId),
+        _maxPipelineLength(255)
     {
     }
 
@@ -139,7 +142,7 @@ public:
 
       // force the state to GETTING_BODY, to simplify logic in
       // responseComplete and handleClose
-      state = STATE_GETTING_BODY;
+        setState(STATE_GETTING_BODY);
       responseComplete();
     }
 
@@ -149,11 +152,16 @@ public:
         port = p;
     }
 
+    void setMaxPipelineLength(unsigned int m)
+    {
+        _maxPipelineLength = m;
+    }
+
     // socket-level errors
     virtual void handleError(int error)
     {
         const char* errStr = strerror(error);
-        SG_LOG(SG_IO, SG_WARN, "HTTP Connection handleError:" << error << " ("
+        SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
                << errStr << ")");
 
         debugDumpRequests();
@@ -183,7 +191,7 @@ public:
             _contentDecoder.reset();
         }
 
-        state = STATE_SOCKET_ERROR;
+        setState(STATE_SOCKET_ERROR);
     }
 
     void handleTimeout()
@@ -198,12 +206,23 @@ public:
     // closing of the connection from the server side when getting the body,
         bool canCloseState = (state == STATE_GETTING_BODY);
         if (canCloseState && activeRequest) {
+            // check bodyTransferSize matches how much we actually transferred
+            if (bodyTransferSize > 0) {
+                if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
+                    SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
+                           << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
+                }
+            }
+
         // force state here, so responseComplete can avoid closing the
         // socket again
-            state =  STATE_CLOSED;
+            SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
+            setState(STATE_CLOSED);
             responseComplete();
         } else {
             if (state == STATE_WAITING_FOR_RESPONSE) {
+                SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
+                       << sentRequests.front()->url());
                 assert(!sentRequests.empty());
                 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
                 // no active request, but don't restore the front sent one
@@ -223,7 +242,7 @@ public:
                 _contentDecoder.reset();
             }
 
-            state = STATE_CLOSED;
+            setState(STATE_CLOSED);
         }
 
       if (sentRequests.empty()) {
@@ -250,13 +269,14 @@ public:
 
         activeRequest = sentRequests.front();
         try {
+            SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
             activeRequest->responseStart(buffer);
         } catch (sg_exception& e) {
             handleError(EIO);
             return;
         }
 
-      state = STATE_GETTING_HEADERS;
+      setState(STATE_GETTING_HEADERS);
       buffer.clear();
       if (activeRequest->responseCode() == 204) {
         noMessageBody = true;
@@ -282,18 +302,19 @@ public:
         return;
       }
 
-      if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
+      if (sentRequests.size() >= _maxPipelineLength) {
         return;
       }
 
       if (state == STATE_CLOSED) {
           if (!connectToHost()) {
-
+              setState(STATE_SOCKET_ERROR);
               return;
           }
 
+          SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
           setTerminator("\r\n");
-          state = STATE_IDLE;
+          setState(STATE_IDLE);
       }
 
       Request_ptr r = queuedRequests.front();
@@ -373,12 +394,12 @@ public:
           }
         }
 
-        SG_LOG(SG_IO, SG_DEBUG, "con:" << connectionId << " did start request:" << r->url());
+        SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
       // successfully sent, remove from queue, and maybe send the next
       queuedRequests.pop_front();
       sentRequests.push_back(r);
       if (state == STATE_IDLE) {
-          state = STATE_WAITING_FOR_RESPONSE;
+          setState(STATE_WAITING_FOR_RESPONSE);
       }
 
       // pipelining, let's maybe send the next request right away
@@ -420,7 +441,7 @@ public:
 
         case STATE_GETTING_CHUNKED_BYTES:
             setTerminator("\r\n");
-            state = STATE_GETTING_CHUNKED;
+            setState(STATE_GETTING_CHUNKED);
             buffer.clear();
             break;
 
@@ -466,7 +487,7 @@ public:
 
     bool shouldStartNext() const
     {
-      return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
+      return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
     }
 
     bool isActive() const
@@ -474,9 +495,14 @@ public:
         return !queuedRequests.empty() || !sentRequests.empty();
     }
 
+    std::string connectionId() const
+    {
+        return _connectionId;
+    }
+
     void debugDumpRequests() const
     {
-        SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << connectionId
+        SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
                << "; state=" << state << ")");
         if (activeRequest) {
             SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
@@ -493,6 +519,27 @@ public:
         }
     }
 private:
+    enum ConnectionState {
+        STATE_IDLE = 0,
+        STATE_WAITING_FOR_RESPONSE,
+        STATE_GETTING_HEADERS,
+        STATE_GETTING_BODY,
+        STATE_GETTING_CHUNKED,
+        STATE_GETTING_CHUNKED_BYTES,
+        STATE_GETTING_TRAILER,
+        STATE_SOCKET_ERROR,
+        STATE_CLOSED             ///< connection should be closed now
+    };
+
+    void setState(ConnectionState newState)
+    {
+        if (state == newState) {
+            return;
+        }
+
+        state = newState;
+    }
+
     bool connectToHost()
     {
         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
@@ -579,11 +626,11 @@ private:
 
         buffer.clear();
         if (chunkSize == 0) {  //  trailer start
-            state = STATE_GETTING_TRAILER;
+            setState(STATE_GETTING_TRAILER);
             return;
         }
 
-        state = STATE_GETTING_CHUNKED_BYTES;
+        setState(STATE_GETTING_CHUNKED_BYTES);
         setByteCount(chunkSize);
     }
 
@@ -605,15 +652,15 @@ private:
         _contentDecoder.initWithRequest(activeRequest);
 
         if (chunkedTransfer) {
-            state = STATE_GETTING_CHUNKED;
+            setState(STATE_GETTING_CHUNKED);
         } else if (noMessageBody || (bodyTransferSize == 0)) {
             // force the state to GETTING_BODY, to simplify logic in
             // responseComplete and handleClose
-            state = STATE_GETTING_BODY;
+            setState(STATE_GETTING_BODY);
             responseComplete();
         } else {
             setByteCount(bodyTransferSize); // may be -1, that's fine
-            state = STATE_GETTING_BODY;
+            setState(STATE_GETTING_BODY);
         }
     }
 
@@ -629,6 +676,7 @@ private:
 
         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
             if (doClose) {
+                SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
           // this will bring us into handleClose() above, which updates
           // state to STATE_CLOSED
               close();
@@ -639,7 +687,7 @@ private:
         }
 
         if (state != STATE_CLOSED)  {
-            state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
+            setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
         }
 
     // notify request after we change state, so this connection is idle
@@ -650,18 +698,6 @@ private:
         setTerminator("\r\n");
     }
 
-    enum ConnectionState {
-        STATE_IDLE = 0,
-        STATE_WAITING_FOR_RESPONSE,
-        STATE_GETTING_HEADERS,
-        STATE_GETTING_BODY,
-        STATE_GETTING_CHUNKED,
-        STATE_GETTING_CHUNKED_BYTES,
-        STATE_GETTING_TRAILER,
-        STATE_SOCKET_ERROR,
-        STATE_CLOSED             ///< connection should be closed now
-    };
-
     Client* client;
     Request_ptr activeRequest;
     ConnectionState state;
@@ -677,7 +713,8 @@ private:
     RequestList sentRequests;
 
     ContentDecoder _contentDecoder;
-    std::string connectionId;
+    std::string _connectionId;
+    unsigned int _maxPipelineLength;
 };
 #endif // of !ENABLE_CURL
 
@@ -686,11 +723,12 @@ Client::Client() :
 {
     d->proxyPort = 0;
     d->maxConnections = 4;
+    d->maxHostConnections = 4;
     d->bytesTransferred = 0;
     d->lastTransferRate = 0;
     d->timeTransferSample.stamp();
     d->totalBytesDownloaded = 0;
-
+    d->maxPipelineDepth = 5;
     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
 #if defined(ENABLE_CURL)
     static bool didInitCurlGlobal = false;
@@ -712,16 +750,33 @@ Client::~Client()
 
 void Client::setMaxConnections(unsigned int maxCon)
 {
-    if (maxCon < 1) {
-        throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
-    }
-
     d->maxConnections = maxCon;
 #if defined(ENABLE_CURL)
     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
 #endif
 }
 
+void Client::setMaxHostConnections(unsigned int maxHostCon)
+{
+    d->maxHostConnections = maxHostCon;
+#if defined(ENABLE_CURL)
+    curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
+#endif
+}
+
+void Client::setMaxPipelineDepth(unsigned int depth)
+{
+    d->maxPipelineDepth = depth;
+#if defined(ENABLE_CURL)
+    curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
+#else
+    ConnectionDict::iterator it = d->connections.begin();
+    for (; it != d->connections.end(); ) {
+        it->second->setMaxPipelineLength(depth);
+    }
+#endif
+}
+
 void Client::update(int waitTimeout)
 {
 #if defined(ENABLE_CURL)
@@ -938,25 +993,35 @@ void Client::makeRequest(const Request_ptr& r)
         }
     }
 
-    if (!con && atConnectionsLimit) {
+    bool atHostConnectionsLimit = (count >= d->maxHostConnections);
+
+    if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
         // all current connections are busy (active), and we don't
         // have free connections to allocate, so let's assign to
         // an existing one randomly. Ideally we'd used whichever one will
         // complete first but we don't have that info.
         int index = rand() % count;
-        for (it = d->connections.find(connectionId); index > 0; --index) { ; }
+        for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
         con = it->second;
     }
 
     // allocate a new connection object
     if (!con) {
-        con = new Connection(this, connectionId);
+        static int connectionSuffx = 0;
+
+        std::stringstream ss;
+        ss << connectionId << "-" << connectionSuffx++;
+
+        SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
+        con = new Connection(this, ss.str());
         con->setServer(host, port);
+        con->setMaxPipelineLength(d->maxPipelineDepth);
         d->poller.addChannel(con);
         d->connections.insert(d->connections.end(),
             ConnectionDict::value_type(connectionId, con));
     }
 
+    SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
     con->queueRequest(r);
 #endif
 }
index 69ea0f1dc9ba3bae37d60267c22abd5c2f5b4655..00682c4c7768d80ae0dd4077f83b1456d1e64c4d 100644 (file)
@@ -75,6 +75,13 @@ public:
      */
     void setMaxConnections(unsigned int maxCons);
 
+    void setMaxHostConnections(unsigned int maxHostConns);
+
+    /**
+     * maximum depth to pipeline requests - set to 0 to disable pipelining
+     */
+    void setMaxPipelineDepth(unsigned int depth);
+
     const std::string& userAgent() const;
 
     const std::string& proxyHost() const;
index d5fc519318e2921f1bda0083a655f348f7dd6ce7..34429b966262bf7f9f05b5924381e63cdd540911 100644 (file)
@@ -51,9 +51,9 @@ ContentDecoder::ContentDecoder() :
     _output(NULL),
     _zlib(NULL),
     _input(NULL),
-    _inputAllocated(0),
-    _inputSize(0)
+    _inputAllocated(0)
 {
+    reset();
 }
 
 ContentDecoder::~ContentDecoder()
@@ -82,6 +82,7 @@ void ContentDecoder::reset()
     _contentDeflate = false;
     _needGZipHeader = false;
     _inputSize = 0;
+    _totalReceivedBytes = 0;
 }
 
 void ContentDecoder::initWithRequest(Request_ptr req)
@@ -120,6 +121,7 @@ void ContentDecoder::finish()
 
 void ContentDecoder::receivedBytes(const char* n, size_t s)
 {
+    _totalReceivedBytes += s;
     if (!_contentDeflate) {
         _request->processBodyBytes(n, s);
         return;
index 1d329828f79bc74daa101bf9c38d7af12a59cee2..010b3c0c13c5d017760d71e17a835e69152ceefd 100644 (file)
@@ -49,6 +49,8 @@ public:
     
     void receivedBytes(const char* n, size_t s);
 
+    size_t getTotalReceivedBytes() const
+    { return _totalReceivedBytes; }
 private:
     bool consumeGZipHeader();
     void runDecoder();
@@ -63,6 +65,7 @@ private:
     unsigned char* _input;
     size_t _inputAllocated, _inputSize;
     bool _contentDeflate, _needGZipHeader;
+    size_t _totalReceivedBytes;
 };
 
 } // of namespace HTTP