From 49146f41e35e53f8d030cd85a8cbeb79b213cdfa Mon Sep 17 00:00:00 2001 From: James Turner Date: Tue, 1 Mar 2016 12:44:22 +0000 Subject: [PATCH] Expose more pipelining controls on HTTP code - used for both implementations, restrict default pipeline depth to 5 instead of 32 which was perhaps a little ambitious for some servers. --- simgear/io/HTTPClient.cxx | 157 ++++++++++++++++++++++--------- simgear/io/HTTPClient.hxx | 7 ++ simgear/io/HTTPContentDecode.cxx | 6 +- simgear/io/HTTPContentDecode.hxx | 3 + 4 files changed, 125 insertions(+), 48 deletions(-) diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index 6ec17c49..39d03679 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -68,7 +68,6 @@ namespace HTTP extern const int DEFAULT_HTTP_PORT = 80; const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded"; -const unsigned int MAX_INFLIGHT_REQUESTS = 4; class Connection; typedef std::multimap ConnectionDict; @@ -89,7 +88,10 @@ public: curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */); curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections); curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, - (long) MAX_INFLIGHT_REQUESTS); + (long) maxPipelineDepth); + curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, + (long) maxHostConnections); + } #else @@ -103,11 +105,11 @@ public: int proxyPort; std::string proxyAuth; unsigned int maxConnections; + unsigned int maxHostConnections; + unsigned int maxPipelineDepth; RequestList pendingRequests; - - SGTimeStamp timeTransferSample; unsigned int bytesTransferred; unsigned int lastTransferRate; @@ -122,7 +124,8 @@ public: client(pr), state(STATE_CLOSED), port(DEFAULT_HTTP_PORT), - connectionId(conId) + _connectionId(conId), + _maxPipelineLength(255) { } @@ -139,7 +142,7 @@ public: // force the state to GETTING_BODY, to simplify logic in // responseComplete and handleClose - state = STATE_GETTING_BODY; + setState(STATE_GETTING_BODY); responseComplete(); } @@ -149,11 +152,16 @@ public: port = p; } + void setMaxPipelineLength(unsigned int m) + { + _maxPipelineLength = m; + } + // socket-level errors virtual void handleError(int error) { const char* errStr = strerror(error); - SG_LOG(SG_IO, SG_WARN, "HTTP Connection handleError:" << error << " (" + SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " (" << errStr << ")"); debugDumpRequests(); @@ -183,7 +191,7 @@ public: _contentDecoder.reset(); } - state = STATE_SOCKET_ERROR; + setState(STATE_SOCKET_ERROR); } void handleTimeout() @@ -198,12 +206,23 @@ public: // closing of the connection from the server side when getting the body, bool canCloseState = (state == STATE_GETTING_BODY); if (canCloseState && activeRequest) { + // check bodyTransferSize matches how much we actually transferred + if (bodyTransferSize > 0) { + if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) { + SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url() + << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize); + } + } + // force state here, so responseComplete can avoid closing the // socket again - state = STATE_CLOSED; + SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url()); + setState(STATE_CLOSED); responseComplete(); } else { if (state == STATE_WAITING_FOR_RESPONSE) { + SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:" + << sentRequests.front()->url()); assert(!sentRequests.empty()); sentRequests.front()->setFailure(500, "server closed connection unexpectedly"); // no active request, but don't restore the front sent one @@ -223,7 +242,7 @@ public: _contentDecoder.reset(); } - state = STATE_CLOSED; + setState(STATE_CLOSED); } if (sentRequests.empty()) { @@ -250,13 +269,14 @@ public: activeRequest = sentRequests.front(); try { + SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url()); activeRequest->responseStart(buffer); } catch (sg_exception& e) { handleError(EIO); return; } - state = STATE_GETTING_HEADERS; + setState(STATE_GETTING_HEADERS); buffer.clear(); if (activeRequest->responseCode() == 204) { noMessageBody = true; @@ -282,18 +302,19 @@ public: return; } - if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) { + if (sentRequests.size() >= _maxPipelineLength) { return; } if (state == STATE_CLOSED) { if (!connectToHost()) { - + setState(STATE_SOCKET_ERROR); return; } + SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected."); setTerminator("\r\n"); - state = STATE_IDLE; + setState(STATE_IDLE); } Request_ptr r = queuedRequests.front(); @@ -373,12 +394,12 @@ public: } } - SG_LOG(SG_IO, SG_DEBUG, "con:" << connectionId << " did start request:" << r->url()); + SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url()); // successfully sent, remove from queue, and maybe send the next queuedRequests.pop_front(); sentRequests.push_back(r); if (state == STATE_IDLE) { - state = STATE_WAITING_FOR_RESPONSE; + setState(STATE_WAITING_FOR_RESPONSE); } // pipelining, let's maybe send the next request right away @@ -420,7 +441,7 @@ public: case STATE_GETTING_CHUNKED_BYTES: setTerminator("\r\n"); - state = STATE_GETTING_CHUNKED; + setState(STATE_GETTING_CHUNKED); buffer.clear(); break; @@ -466,7 +487,7 @@ public: bool shouldStartNext() const { - return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS); + return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength); } bool isActive() const @@ -474,9 +495,14 @@ public: return !queuedRequests.empty() || !sentRequests.empty(); } + std::string connectionId() const + { + return _connectionId; + } + void debugDumpRequests() const { - SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << connectionId + SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId << "; state=" << state << ")"); if (activeRequest) { SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url()); @@ -493,6 +519,27 @@ public: } } private: + enum ConnectionState { + STATE_IDLE = 0, + STATE_WAITING_FOR_RESPONSE, + STATE_GETTING_HEADERS, + STATE_GETTING_BODY, + STATE_GETTING_CHUNKED, + STATE_GETTING_CHUNKED_BYTES, + STATE_GETTING_TRAILER, + STATE_SOCKET_ERROR, + STATE_CLOSED ///< connection should be closed now + }; + + void setState(ConnectionState newState) + { + if (state == newState) { + return; + } + + state = newState; + } + bool connectToHost() { SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port); @@ -579,11 +626,11 @@ private: buffer.clear(); if (chunkSize == 0) { // trailer start - state = STATE_GETTING_TRAILER; + setState(STATE_GETTING_TRAILER); return; } - state = STATE_GETTING_CHUNKED_BYTES; + setState(STATE_GETTING_CHUNKED_BYTES); setByteCount(chunkSize); } @@ -605,15 +652,15 @@ private: _contentDecoder.initWithRequest(activeRequest); if (chunkedTransfer) { - state = STATE_GETTING_CHUNKED; + setState(STATE_GETTING_CHUNKED); } else if (noMessageBody || (bodyTransferSize == 0)) { // force the state to GETTING_BODY, to simplify logic in // responseComplete and handleClose - state = STATE_GETTING_BODY; + setState(STATE_GETTING_BODY); responseComplete(); } else { setByteCount(bodyTransferSize); // may be -1, that's fine - state = STATE_GETTING_BODY; + setState(STATE_GETTING_BODY); } } @@ -629,6 +676,7 @@ private: if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) { if (doClose) { + SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested"); // this will bring us into handleClose() above, which updates // state to STATE_CLOSED close(); @@ -639,7 +687,7 @@ private: } if (state != STATE_CLOSED) { - state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE; + setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE); } // notify request after we change state, so this connection is idle @@ -650,18 +698,6 @@ private: setTerminator("\r\n"); } - enum ConnectionState { - STATE_IDLE = 0, - STATE_WAITING_FOR_RESPONSE, - STATE_GETTING_HEADERS, - STATE_GETTING_BODY, - STATE_GETTING_CHUNKED, - STATE_GETTING_CHUNKED_BYTES, - STATE_GETTING_TRAILER, - STATE_SOCKET_ERROR, - STATE_CLOSED ///< connection should be closed now - }; - Client* client; Request_ptr activeRequest; ConnectionState state; @@ -677,7 +713,8 @@ private: RequestList sentRequests; ContentDecoder _contentDecoder; - std::string connectionId; + std::string _connectionId; + unsigned int _maxPipelineLength; }; #endif // of !ENABLE_CURL @@ -686,11 +723,12 @@ Client::Client() : { d->proxyPort = 0; d->maxConnections = 4; + d->maxHostConnections = 4; d->bytesTransferred = 0; d->lastTransferRate = 0; d->timeTransferSample.stamp(); d->totalBytesDownloaded = 0; - + d->maxPipelineDepth = 5; setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION)); #if defined(ENABLE_CURL) static bool didInitCurlGlobal = false; @@ -712,16 +750,33 @@ Client::~Client() void Client::setMaxConnections(unsigned int maxCon) { - if (maxCon < 1) { - throw sg_range_exception("illegal HTTP::Client::setMaxConnections value"); - } - d->maxConnections = maxCon; #if defined(ENABLE_CURL) curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon); #endif } +void Client::setMaxHostConnections(unsigned int maxHostCon) +{ + d->maxHostConnections = maxHostCon; +#if defined(ENABLE_CURL) + curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon); +#endif +} + +void Client::setMaxPipelineDepth(unsigned int depth) +{ + d->maxPipelineDepth = depth; +#if defined(ENABLE_CURL) + curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth); +#else + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ) { + it->second->setMaxPipelineLength(depth); + } +#endif +} + void Client::update(int waitTimeout) { #if defined(ENABLE_CURL) @@ -938,25 +993,35 @@ void Client::makeRequest(const Request_ptr& r) } } - if (!con && atConnectionsLimit) { + bool atHostConnectionsLimit = (count >= d->maxHostConnections); + + if (!con && (atConnectionsLimit || atHostConnectionsLimit)) { // all current connections are busy (active), and we don't // have free connections to allocate, so let's assign to // an existing one randomly. Ideally we'd used whichever one will // complete first but we don't have that info. int index = rand() % count; - for (it = d->connections.find(connectionId); index > 0; --index) { ; } + for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; } con = it->second; } // allocate a new connection object if (!con) { - con = new Connection(this, connectionId); + static int connectionSuffx = 0; + + std::stringstream ss; + ss << connectionId << "-" << connectionSuffx++; + + SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str()); + con = new Connection(this, ss.str()); con->setServer(host, port); + con->setMaxPipelineLength(d->maxPipelineDepth); d->poller.addChannel(con); d->connections.insert(d->connections.end(), ConnectionDict::value_type(connectionId, con)); } + SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId()); con->queueRequest(r); #endif } diff --git a/simgear/io/HTTPClient.hxx b/simgear/io/HTTPClient.hxx index 69ea0f1d..00682c4c 100644 --- a/simgear/io/HTTPClient.hxx +++ b/simgear/io/HTTPClient.hxx @@ -75,6 +75,13 @@ public: */ void setMaxConnections(unsigned int maxCons); + void setMaxHostConnections(unsigned int maxHostConns); + + /** + * maximum depth to pipeline requests - set to 0 to disable pipelining + */ + void setMaxPipelineDepth(unsigned int depth); + const std::string& userAgent() const; const std::string& proxyHost() const; diff --git a/simgear/io/HTTPContentDecode.cxx b/simgear/io/HTTPContentDecode.cxx index d5fc5193..34429b96 100644 --- a/simgear/io/HTTPContentDecode.cxx +++ b/simgear/io/HTTPContentDecode.cxx @@ -51,9 +51,9 @@ ContentDecoder::ContentDecoder() : _output(NULL), _zlib(NULL), _input(NULL), - _inputAllocated(0), - _inputSize(0) + _inputAllocated(0) { + reset(); } ContentDecoder::~ContentDecoder() @@ -82,6 +82,7 @@ void ContentDecoder::reset() _contentDeflate = false; _needGZipHeader = false; _inputSize = 0; + _totalReceivedBytes = 0; } void ContentDecoder::initWithRequest(Request_ptr req) @@ -120,6 +121,7 @@ void ContentDecoder::finish() void ContentDecoder::receivedBytes(const char* n, size_t s) { + _totalReceivedBytes += s; if (!_contentDeflate) { _request->processBodyBytes(n, s); return; diff --git a/simgear/io/HTTPContentDecode.hxx b/simgear/io/HTTPContentDecode.hxx index 1d329828..010b3c0c 100644 --- a/simgear/io/HTTPContentDecode.hxx +++ b/simgear/io/HTTPContentDecode.hxx @@ -49,6 +49,8 @@ public: void receivedBytes(const char* n, size_t s); + size_t getTotalReceivedBytes() const + { return _totalReceivedBytes; } private: bool consumeGZipHeader(); void runDecoder(); @@ -63,6 +65,7 @@ private: unsigned char* _input; size_t _inputAllocated, _inputSize; bool _contentDeflate, _needGZipHeader; + size_t _totalReceivedBytes; }; } // of namespace HTTP -- 2.39.5