X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=simgear%2Fio%2FHTTPClient.cxx;h=d5ef1ea5108039ff2dd8e01eea27f0141d69be03;hb=8d93206dd33ed0079af6670a0ecd41a3b203d9a0;hp=7531a1011e15a62dcaabad0ed1cbec035e7f4cf0;hpb=4854a9e32025404e68865c61b95f2143d023c6b6;p=simgear.git diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index 7531a101..d5ef1ea5 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -1,21 +1,56 @@ +/** + * \file HTTPClient.cxx - simple HTTP client engine for SimHear + */ + +// Written by James Turner +// +// Copyright (C) 2013 James Turner +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +// + + #include "HTTPClient.hxx" +#include "HTTPFileRequest.hxx" #include #include +#include // rand() #include -#include +#include +#include +#include #include #include -#include +#include + +#if defined(ENABLE_CURL) + #include +#else + #include +#endif #include -#include + #include #include #include #include +#include #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H #include "version.h" @@ -25,10 +60,6 @@ # endif #endif -using std::string; -using std::stringstream; -using std::vector; - namespace simgear { @@ -37,102 +68,252 @@ namespace HTTP extern const int DEFAULT_HTTP_PORT = 80; const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded"; -const int MAX_INFLIGHT_REQUESTS = 32; -const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024; -const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS; - -// see http://www.ietf.org/rfc/rfc1952.txt for these values and -// detailed description of the logic -const int GZIP_HEADER_ID1 = 31; -const int GZIP_HEADER_ID2 = 139; -const int GZIP_HEADER_METHOD_DEFLATE = 8; -const int GZIP_HEADER_SIZE = 10; -const int GZIP_HEADER_FEXTRA = 1 << 2; -const int GZIP_HEADER_FNAME = 1 << 3; -const int GZIP_HEADER_COMMENT = 1 << 4; -const int GZIP_HEADER_CRC = 1 << 1; - + +class Connection; +typedef std::multimap ConnectionDict; +typedef std::list RequestList; + +class Client::ClientPrivate +{ +public: +#if defined(ENABLE_CURL) + CURLM* curlMulti; + + void createCurlMulti() + { + curlMulti = curl_multi_init(); + // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html + // we request HTTP 1.1 pipelining + curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */); + curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections); + curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, + (long) maxPipelineDepth); + curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, + (long) maxHostConnections); + + + } + + typedef std::map RequestCurlMap; + RequestCurlMap requests; +#else + NetChannelPoller poller; +// connections by host (potentially more than one) + ConnectionDict connections; +#endif + + std::string userAgent; + std::string proxy; + int proxyPort; + std::string proxyAuth; + unsigned int maxConnections; + unsigned int maxHostConnections; + unsigned int maxPipelineDepth; + + RequestList pendingRequests; + + SGTimeStamp timeTransferSample; + unsigned int bytesTransferred; + unsigned int lastTransferRate; + uint64_t totalBytesDownloaded; +}; + +#if !defined(ENABLE_CURL) class Connection : public NetChat { public: - Connection(Client* pr) : + Connection(Client* pr, const std::string& conId) : client(pr), state(STATE_CLOSED), port(DEFAULT_HTTP_PORT), - zlibInflateBuffer(NULL), - zlibInflateBufferSize(0), - zlibOutputBuffer(NULL) + _connectionId(conId), + _maxPipelineLength(255) { - } - + virtual ~Connection() { - if (zlibInflateBuffer) { - free(zlibInflateBuffer); - } - - if (zlibOutputBuffer) { - free(zlibOutputBuffer); - } } - - void setServer(const string& h, short p) + + virtual void handleBufferRead (NetBuffer& buffer) + { + if( !activeRequest || !activeRequest->isComplete() ) + return NetChat::handleBufferRead(buffer); + + // Request should be aborted (signaled by setting its state to complete). + + // force the state to GETTING_BODY, to simplify logic in + // responseComplete and handleClose + setState(STATE_GETTING_BODY); + responseComplete(); + } + + void setServer(const std::string& h, short p) { host = h; port = p; } - + + void setMaxPipelineLength(unsigned int m) + { + _maxPipelineLength = m; + } + // socket-level errors virtual void handleError(int error) - { + { + const char* errStr = strerror(error); + SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " (" + << errStr << ")"); + + debugDumpRequests(); + + if (!activeRequest) + { + // connection level failure, eg name lookup or routing + // we won't have an active request yet, so let's fail all of the + // requests since we presume it's a systematic failure for + // the host in question + BOOST_FOREACH(Request_ptr req, sentRequests) { + req->setFailure(error, errStr); + } + + BOOST_FOREACH(Request_ptr req, queuedRequests) { + req->setFailure(error, errStr); + } + + sentRequests.clear(); + queuedRequests.clear(); + } + NetChat::handleError(error); if (activeRequest) { - SG_LOG(SG_IO, SG_INFO, "HTTP socket error"); - activeRequest->setFailure(error, "socket error"); + activeRequest->setFailure(error, errStr); activeRequest = NULL; + _contentDecoder.reset(); } - - state = STATE_SOCKET_ERROR; + + setState(STATE_SOCKET_ERROR); + } + + void handleTimeout() + { + handleError(ETIMEDOUT); } - + virtual void handleClose() - { + { NetChat::handleClose(); - - if ((state == STATE_GETTING_BODY) && activeRequest) { - // force state here, so responseComplete can avoid closing the + + // closing of the connection from the server side when getting the body, + bool canCloseState = (state == STATE_GETTING_BODY); + bool isCancelling = (state == STATE_CANCELLING); + + if (canCloseState && activeRequest) { + // check bodyTransferSize matches how much we actually transferred + if (bodyTransferSize > 0) { + if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) { + SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url() + << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize); + } + } + + // force state here, so responseComplete can avoid closing the // socket again - state = STATE_CLOSED; + SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url()); + setState(STATE_CLOSED); responseComplete(); } else { - state = STATE_CLOSED; + if (state == STATE_WAITING_FOR_RESPONSE) { + SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:" + << sentRequests.front()->url()); + assert(!sentRequests.empty()); + sentRequests.front()->setFailure(500, "server closed connection unexpectedly"); + // no active request, but don't restore the front sent one + sentRequests.erase(sentRequests.begin()); + } + + if (activeRequest && !isCancelling) { + activeRequest->setFailure(500, "server closed connection"); + // remove the failed request from sentRequests, so it does + // not get restored + RequestList::iterator it = std::find(sentRequests.begin(), + sentRequests.end(), activeRequest); + if (it != sentRequests.end()) { + sentRequests.erase(it); + } + activeRequest = NULL; + _contentDecoder.reset(); + } + + setState(STATE_CLOSED); } - + if (sentRequests.empty()) { return; } - + // restore sent requests to the queue, so they will be re-sent // when the connection opens again queuedRequests.insert(queuedRequests.begin(), sentRequests.begin(), sentRequests.end()); sentRequests.clear(); } - + void queueRequest(const Request_ptr& r) { - queuedRequests.push_back(r); - tryStartNextRequest(); + queuedRequests.push_back(r); + tryStartNextRequest(); + } + + void cancelRequest(const Request_ptr& r) + { + RequestList::iterator it = std::find(sentRequests.begin(), + sentRequests.end(), r); + if (it != sentRequests.end()) { + sentRequests.erase(it); + + if ((r == activeRequest) || !activeRequest) { + // either the cancelling request is active, or we're in waiting + // for response state - close now + setState(STATE_CANCELLING); + close(); + + setState(STATE_CLOSED); + activeRequest = NULL; + _contentDecoder.reset(); + } else if (activeRequest) { + SG_LOG(SG_IO, SG_INFO, "con:" << _connectionId << " cancelling non-active: " << r->url()); + + // has been sent but not active, let the active finish and + // then close. Otherwise cancelling request #2 would mess up + // active transfer #1 + activeRequest->setCloseAfterComplete(); + } + } // of request has been sent + + // simpler case, not sent yet just remove from the queue + it = std::find(queuedRequests.begin(), queuedRequests.end(), r); + if (it != queuedRequests.end()) { + queuedRequests.erase(it); + } } - + void beginResponse() { - assert(!sentRequests.empty()); - - activeRequest = sentRequests.front(); - activeRequest->responseStart(buffer); - state = STATE_GETTING_HEADERS; + assert(!sentRequests.empty()); + assert(state == STATE_WAITING_FOR_RESPONSE); + + activeRequest = sentRequests.front(); + try { + SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url()); + activeRequest->responseStart(buffer); + } catch (sg_exception& e) { + handleError(EIO); + return; + } + + setState(STATE_GETTING_HEADERS); buffer.clear(); if (activeRequest->responseCode() == 204) { noMessageBody = true; @@ -144,54 +325,62 @@ public: bodyTransferSize = -1; chunkedTransfer = false; - contentGZip = contentDeflate = false; + _contentDecoder.reset(); } - + void tryStartNextRequest() { + while( !queuedRequests.empty() + && queuedRequests.front()->isComplete() ) + queuedRequests.pop_front(); + if (queuedRequests.empty()) { idleTime.stamp(); return; } - - if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) { + + if (sentRequests.size() >= _maxPipelineLength) { return; } - + if (state == STATE_CLOSED) { if (!connectToHost()) { + setState(STATE_SOCKET_ERROR); return; } - + + SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected."); setTerminator("\r\n"); - state = STATE_IDLE; + setState(STATE_IDLE); } - + Request_ptr r = queuedRequests.front(); - requestBodyBytesToSend = r->requestBodyLength(); - - stringstream headerData; - string path = r->path(); - string query = r->query(); - string bodyData; - + r->requestStart(); + + std::stringstream headerData; + std::string path = r->path(); + assert(!path.empty()); + std::string query = r->query(); + std::string bodyData; + if (!client->proxyHost().empty()) { path = r->scheme() + "://" + r->host() + r->path(); } - if (r->method() == "POST") { + if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) { headerData << r->method() << " " << path << " HTTP/1.1\r\n"; bodyData = query.substr(1); // URL-encode, drop the leading '?' headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n"; headerData << "Content-Length:" << bodyData.size() << "\r\n"; } else { headerData << r->method() << " " << path << query << " HTTP/1.1\r\n"; - if (requestBodyBytesToSend >= 0) { - headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n"; - headerData << "Content-Type:" << r->requestBodyType() << "\r\n"; + if( r->hasBodyData() ) + { + headerData << "Content-Length:" << r->bodyLength() << "\r\n"; + headerData << "Content-Type:" << r->bodyType() << "\r\n"; } } - + headerData << "Host: " << r->hostAndPort() << "\r\n"; headerData << "User-Agent:" << client->userAgent() << "\r\n"; headerData << "Accept-Encoding: deflate, gzip\r\n"; @@ -199,298 +388,233 @@ public: headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; } - BOOST_FOREACH(string h, r->requestHeaders()) { - headerData << h << ": " << r->header(h) << "\r\n"; + BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) { + headerData << h.first << ": " << h.second << "\r\n"; } headerData << "\r\n"; // final CRLF to terminate the headers if (!bodyData.empty()) { headerData << bodyData; } - + bool ok = push(headerData.str().c_str()); if (!ok) { + SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket"); // we've over-stuffed the socket, give up for now, let things // drain down before trying to start any more requests. return; } - - while (requestBodyBytesToSend > 0) { - char buf[4096]; - int len = 4096; - r->getBodyData(buf, len); - if (len > 0) { - requestBodyBytesToSend -= len; - if (!bufferSend(buf, len)) { - SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer"); - state = STATE_SOCKET_ERROR; - return; + + if( r->hasBodyData() ) + for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();) + { + char buf[4096]; + size_t len = r->getBodyData(buf, body_bytes_sent, 4096); + if( len ) + { + if( !bufferSend(buf, len) ) + { + SG_LOG(SG_IO, + SG_WARN, + "overflow the HTTP::Connection output buffer"); + state = STATE_SOCKET_ERROR; + return; + } + body_bytes_sent += len; + } + else + { + SG_LOG(SG_IO, + SG_WARN, + "HTTP asynchronous request body generation is unsupported"); + break; } - } else { - SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported"); - break; } - } - - //std::cout << "did send request:" << r->url() << std::endl; - // successfully sent, remove from queue, and maybe send the next + + SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url()); + // successfully sent, remove from queue, and maybe send the next queuedRequests.pop_front(); sentRequests.push_back(r); - - // pipelining, let's maybe send the next request right away + if (state == STATE_IDLE) { + setState(STATE_WAITING_FOR_RESPONSE); + } + + // pipelining, let's maybe send the next request right away tryStartNextRequest(); } - + virtual void collectIncomingData(const char* s, int n) { idleTime.stamp(); - if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) { - if (contentGZip || contentDeflate) { - expandCompressedData(s, n); - } else { - activeRequest->processBodyBytes(s, n); - } - } else { - buffer += string(s, n); - } - } + client->receivedBytes(static_cast(n)); - - void expandCompressedData(const char* s, int n) - { - int reqSize = n + zlib.avail_in; - if (reqSize > zlibInflateBufferSize) { - // reallocate - unsigned char* newBuf = (unsigned char*) malloc(reqSize); - memcpy(newBuf, zlib.next_in, zlib.avail_in); - memcpy(newBuf + zlib.avail_in, s, n); - free(zlibInflateBuffer); - zlibInflateBuffer = newBuf; - zlibInflateBufferSize = reqSize; - } else { - // important to use memmove here, since it's very likely - // the source and destination ranges overlap - memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in); - memcpy(zlibInflateBuffer + zlib.avail_in, s, n); - } - - zlib.next_in = (unsigned char*) zlibInflateBuffer; - zlib.avail_in = reqSize; - zlib.next_out = zlibOutputBuffer; - zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; - - if (contentGZip) { - // we clear this down to contentDeflate once the GZip header has been seen - if (reqSize < GZIP_HEADER_SIZE) { - return; // need more header bytes - } - - if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) || - (zlibInflateBuffer[1] != GZIP_HEADER_ID2) || - (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE)) - { - return; // invalid GZip header - } - - char flags = zlibInflateBuffer[3]; - int gzipHeaderSize = GZIP_HEADER_SIZE; - if (flags & GZIP_HEADER_FEXTRA) { - gzipHeaderSize += 2; - if (reqSize < gzipHeaderSize) { - return; // need more header bytes - } - - unsigned short extraHeaderBytes = *(reinterpret_cast(zlibInflateBuffer + GZIP_HEADER_FEXTRA)); - if ( sgIsBigEndian() ) { - sgEndianSwap( &extraHeaderBytes ); - } - - gzipHeaderSize += extraHeaderBytes; - if (reqSize < gzipHeaderSize) { - return; // need more header bytes - } - } - - if (flags & GZIP_HEADER_FNAME) { - gzipHeaderSize++; - while (gzipHeaderSize <= reqSize) { - if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { - break; // found terminating NULL character - } - } - } - - if (flags & GZIP_HEADER_COMMENT) { - gzipHeaderSize++; - while (gzipHeaderSize <= reqSize) { - if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { - break; // found terminating NULL character - } - } - } - - if (flags & GZIP_HEADER_CRC) { - gzipHeaderSize += 2; - } - - if (reqSize < gzipHeaderSize) { - return; // need more header bytes - } - - zlib.next_in += gzipHeaderSize; - zlib.avail_in = reqSize - gzipHeaderSize; - // now we've processed the GZip header, can decode as deflate - contentGZip = false; - contentDeflate = true; - } - - int writtenSize = 0; - do { - int result = inflate(&zlib, Z_NO_FLUSH); - if (result == Z_OK || result == Z_STREAM_END) { - - } else { - SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result); - return; - } - - writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out; - } while ((writtenSize == 0) && (zlib.avail_in > 0)); - - if (writtenSize > 0) { - activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize); - } + if( (state == STATE_GETTING_BODY) + || (state == STATE_GETTING_CHUNKED_BYTES) ) + _contentDecoder.receivedBytes(s, n); + else + buffer.append(s, n); } - + virtual void foundTerminator(void) { idleTime.stamp(); switch (state) { - case STATE_IDLE: + case STATE_WAITING_FOR_RESPONSE: beginResponse(); break; - + case STATE_GETTING_HEADERS: processHeader(); buffer.clear(); break; - + case STATE_GETTING_BODY: responseComplete(); break; - + case STATE_GETTING_CHUNKED: processChunkHeader(); break; - + case STATE_GETTING_CHUNKED_BYTES: setTerminator("\r\n"); - state = STATE_GETTING_CHUNKED; + setState(STATE_GETTING_CHUNKED); + buffer.clear(); break; - + case STATE_GETTING_TRAILER: processTrailer(); buffer.clear(); break; - + + case STATE_IDLE: + SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?"); + default: break; } } - + bool hasIdleTimeout() const { - if (state != STATE_IDLE) { + if ((state != STATE_IDLE) && (state != STATE_CLOSED)) { return false; } - - return idleTime.elapsedMSec() > 1000 * 10; // ten seconds + + assert(sentRequests.empty()); + bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds + return isTimedOut; } - + bool hasErrorTimeout() const { - if (state == STATE_IDLE) { + if ((state == STATE_IDLE) || (state == STATE_CLOSED)) { return false; } - - return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds + + bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds + return isTimedOut; } - + bool hasError() const { return (state == STATE_SOCKET_ERROR); } - + bool shouldStartNext() const { - return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS); + return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength); + } + + bool isActive() const + { + return !queuedRequests.empty() || !sentRequests.empty(); + } + + std::string connectionId() const + { + return _connectionId; + } + + void debugDumpRequests() const + { + SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId + << "; state=" << state << ")"); + if (activeRequest) { + SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url()); + } else { + SG_LOG(SG_IO, SG_DEBUG, "\tNo active request"); + } + + BOOST_FOREACH(Request_ptr req, sentRequests) { + SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url()); + } + + BOOST_FOREACH(Request_ptr req, queuedRequests) { + SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url()); + } } private: + enum ConnectionState { + STATE_IDLE = 0, + STATE_WAITING_FOR_RESPONSE, + STATE_GETTING_HEADERS, + STATE_GETTING_BODY, + STATE_GETTING_CHUNKED, + STATE_GETTING_CHUNKED_BYTES, + STATE_GETTING_TRAILER, + STATE_SOCKET_ERROR, + STATE_CANCELLING, ///< cancelling an acitve request + STATE_CLOSED ///< connection should be closed now + }; + + void setState(ConnectionState newState) + { + if (state == newState) { + return; + } + + state = newState; + } + bool connectToHost() { - SG_LOG(SG_IO, SG_INFO, "HTTP connecting to " << host << ":" << port); - + SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port); + if (!open()) { - SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed"); + SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed"); return false; } - + if (connect(host.c_str(), port) != 0) { + SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed"); return false; } - + return true; } - - + + void processHeader() { - string h = strutils::simplify(buffer); + std::string h = strutils::simplify(buffer); if (h.empty()) { // blank line terminates headers headersComplete(); - - if (contentGZip || contentDeflate) { - memset(&zlib, 0, sizeof(z_stream)); - if (!zlibOutputBuffer) { - zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE); - } - - // NULLs means we'll get default alloc+free methods - // which is absolutely fine - zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; - zlib.next_out = zlibOutputBuffer; - if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) { - SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed"); - } - } - - if (chunkedTransfer) { - state = STATE_GETTING_CHUNKED; - } else if (noMessageBody || (bodyTransferSize == 0)) { - // force the state to GETTING_BODY, to simplify logic in - // responseComplete and handleClose - state = STATE_GETTING_BODY; - responseComplete(); - } else { - setByteCount(bodyTransferSize); // may be -1, that's fine - state = STATE_GETTING_BODY; - } - return; } - + int colonPos = buffer.find(':'); if (colonPos < 0) { SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h); return; } - - string key = strutils::simplify(buffer.substr(0, colonPos)); - string lkey = boost::to_lower_copy(key); - string value = strutils::strip(buffer.substr(colonPos + 1)); - - // only consider these if getting headers (as opposed to trailers + + std::string key = strutils::simplify(buffer.substr(0, colonPos)); + std::string lkey = boost::to_lower_copy(key); + std::string value = strutils::strip(buffer.substr(colonPos + 1)); + + // only consider these if getting headers (as opposed to trailers // of a chunked transfer) if (state == STATE_GETTING_HEADERS) { if (lkey == "content-length") { @@ -505,20 +629,14 @@ private: } else if (lkey == "transfer-encoding") { processTransferEncoding(value); } else if (lkey == "content-encoding") { - if (value == "gzip") { - contentGZip = true; - } else if (value == "deflate") { - contentDeflate = true; - } else if (value != "identity") { - SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value); - } + _contentDecoder.setEncoding(value); } } - + activeRequest->responseHeader(lkey, value); } - - void processTransferEncoding(const string& te) + + void processTransferEncoding(const std::string& te) { if (te == "chunked") { chunkedTransfer = true; @@ -527,14 +645,14 @@ private: // failure } } - + void processChunkHeader() { if (buffer.empty()) { // blank line after chunk data return; } - + int chunkSize = 0; int semiPos = buffer.find(';'); if (semiPos >= 0) { @@ -543,166 +661,651 @@ private: } else { chunkSize = strutils::to_int(buffer, 16); } - + buffer.clear(); if (chunkSize == 0) { // trailer start - state = STATE_GETTING_TRAILER; + setState(STATE_GETTING_TRAILER); return; } - - state = STATE_GETTING_CHUNKED_BYTES; + + setState(STATE_GETTING_CHUNKED_BYTES); setByteCount(chunkSize); } - + void processTrailer() - { + { if (buffer.empty()) { // end of trailers responseComplete(); return; } - + // process as a normal header processHeader(); } - + void headersComplete() { activeRequest->responseHeadersComplete(); + _contentDecoder.initWithRequest(activeRequest); + + if (!activeRequest->serverSupportsPipelining()) { + SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it"); + _maxPipelineLength = 1; + } + + if (chunkedTransfer) { + setState(STATE_GETTING_CHUNKED); + } else if (noMessageBody || (bodyTransferSize == 0)) { + // force the state to GETTING_BODY, to simplify logic in + // responseComplete and handleClose + setState(STATE_GETTING_BODY); + responseComplete(); + } else { + setByteCount(bodyTransferSize); // may be -1, that's fine + setState(STATE_GETTING_BODY); + } } - + void responseComplete() { - //std::cout << "responseComplete:" << activeRequest->url() << std::endl; - activeRequest->responseComplete(); - client->requestFinished(this); - - if (contentDeflate) { - inflateEnd(&zlib); - } - + Request_ptr completedRequest = activeRequest; + _contentDecoder.finish(); + assert(sentRequests.front() == activeRequest); sentRequests.pop_front(); bool doClose = activeRequest->closeAfterComplete(); activeRequest = NULL; - + if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) { if (doClose) { + SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested"); // this will bring us into handleClose() above, which updates // state to STATE_CLOSED - close(); - + close(); + // if we have additional requests waiting, try to start them now - tryStartNextRequest(); - } + tryStartNextRequest(); + } } - - if (state != STATE_CLOSED) { - state = STATE_IDLE; - } - setTerminator("\r\n"); + + if (state != STATE_CLOSED) { + setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE); + } + + // notify request after we change state, so this connection is idle + // if completion triggers other requests (which is likely) + completedRequest->responseComplete(); + client->requestFinished(this); + + setTerminator("\r\n"); } - - enum ConnectionState { - STATE_IDLE = 0, - STATE_GETTING_HEADERS, - STATE_GETTING_BODY, - STATE_GETTING_CHUNKED, - STATE_GETTING_CHUNKED_BYTES, - STATE_GETTING_TRAILER, - STATE_SOCKET_ERROR, - STATE_CLOSED ///< connection should be closed now - }; - + Client* client; Request_ptr activeRequest; ConnectionState state; - string host; + std::string host; short port; std::string buffer; int bodyTransferSize; SGTimeStamp idleTime; bool chunkedTransfer; bool noMessageBody; - int requestBodyBytesToSend; - - z_stream zlib; - unsigned char* zlibInflateBuffer; - int zlibInflateBufferSize; - unsigned char* zlibOutputBuffer; - bool contentGZip, contentDeflate; - - std::list queuedRequests; - std::list sentRequests; + + RequestList queuedRequests; + RequestList sentRequests; + + ContentDecoder _contentDecoder; + std::string _connectionId; + unsigned int _maxPipelineLength; }; +#endif // of !ENABLE_CURL -Client::Client() +Client::Client() : + d(new ClientPrivate) { + d->proxyPort = 0; + d->maxConnections = 4; + d->maxHostConnections = 4; + d->bytesTransferred = 0; + d->lastTransferRate = 0; + d->timeTransferSample.stamp(); + d->totalBytesDownloaded = 0; + d->maxPipelineDepth = 5; setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION)); +#if defined(ENABLE_CURL) + static bool didInitCurlGlobal = false; + if (!didInitCurlGlobal) { + curl_global_init(CURL_GLOBAL_ALL); + didInitCurlGlobal = true; + } + + d->createCurlMulti(); +#endif +} + +Client::~Client() +{ +#if defined(ENABLE_CURL) + curl_multi_cleanup(d->curlMulti); +#endif +} + +void Client::setMaxConnections(unsigned int maxCon) +{ + d->maxConnections = maxCon; +#if defined(ENABLE_CURL) + curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon); +#endif +} + +void Client::setMaxHostConnections(unsigned int maxHostCon) +{ + d->maxHostConnections = maxHostCon; +#if defined(ENABLE_CURL) + curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon); +#endif +} + +void Client::setMaxPipelineDepth(unsigned int depth) +{ + d->maxPipelineDepth = depth; +#if defined(ENABLE_CURL) + curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth); +#else + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ) { + it->second->setMaxPipelineLength(depth); + } +#endif } void Client::update(int waitTimeout) { - NetChannel::poll(waitTimeout); - - ConnectionDict::iterator it = _connections.begin(); - for (; it != _connections.end(); ) { - if (it->second->hasIdleTimeout() || it->second->hasError() || - it->second->hasErrorTimeout()) +#if defined(ENABLE_CURL) + int remainingActive, messagesInQueue; + curl_multi_perform(d->curlMulti, &remainingActive); + + CURLMsg* msg; + while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) { + if (msg->msg == CURLMSG_DONE) { + Request* rawReq = 0; + CURL *e = msg->easy_handle; + curl_easy_getinfo(e, CURLINFO_PRIVATE, &rawReq); + + // ensure request stays valid for the moment + // eg if responseComplete cancels us + Request_ptr req(rawReq); + + long responseCode; + curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode); + + // remove from the requests map now, + // in case the callbacks perform a cancel. We'll use + // the absence from the request dict in cancel to avoid + // a double remove + ClientPrivate::RequestCurlMap::iterator it = d->requests.find(req); + assert(it != d->requests.end()); + assert(it->second == e); + d->requests.erase(it); + + if (msg->data.result == 0) { + req->responseComplete(); + } else { + SG_LOG(SG_IO, SG_WARN, "CURL Result:" << msg->data.result << " " << curl_easy_strerror(msg->data.result)); + req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result)); + } + + curl_multi_remove_handle(d->curlMulti, e); + curl_easy_cleanup(e); + } else { + // should never happen since CURLMSG_DONE is the only code + // defined! + SG_LOG(SG_IO, SG_ALERT, "unknown CurlMSG:" << msg->msg); + } + } // of curl message processing loop + SGTimeStamp::sleepForMSec(waitTimeout); +#else + if (!d->poller.hasChannels() && (waitTimeout > 0)) { + SGTimeStamp::sleepForMSec(waitTimeout); + } else { + d->poller.poll(waitTimeout); + } + + bool waitingRequests = !d->pendingRequests.empty(); + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ) { + Connection* con = it->second; + if (con->hasIdleTimeout() || + con->hasError() || + con->hasErrorTimeout() || + (!con->isActive() && waitingRequests)) { + if (con->hasErrorTimeout()) { + // tell the connection we're timing it out + con->handleTimeout(); + } + // connection has been idle for a while, clean it up - // (or has an error condition, again clean it up) + // (or if we have requests waiting for a different host, + // or an error condition ConnectionDict::iterator del = it++; delete del->second; - _connections.erase(del); + d->connections.erase(del); } else { if (it->second->shouldStartNext()) { - SG_LOG(SG_IO, SG_INFO, "should start next, hmm"); it->second->tryStartNextRequest(); } - ++it; } - } // of connecion iteration + } // of connection iteration + + if (waitingRequests && (d->connections.size() < d->maxConnections)) { + RequestList waiting(d->pendingRequests); + d->pendingRequests.clear(); + + // re-submit all waiting requests in order; this takes care of + // finding multiple pending items targetted to the same (new) + // connection + BOOST_FOREACH(Request_ptr req, waiting) { + makeRequest(req); + } + } +#endif } void Client::makeRequest(const Request_ptr& r) { - string host = r->host(); + if( r->isComplete() ) + return; + + if( r->url().find("://") == std::string::npos ) { + r->setFailure(EINVAL, "malformed URL"); + return; + } + + r->_client = this; + +#if defined(ENABLE_CURL) + ClientPrivate::RequestCurlMap::iterator rit = d->requests.find(r); + assert(rit == d->requests.end()); + + CURL* curlRequest = curl_easy_init(); + curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str()); + + d->requests[r] = curlRequest; + + curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get()); + // disable built-in libCurl progress feedback + curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1); + + curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback); + curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get()); + curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback); + curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get()); + + curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str()); + curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); + + if (!d->proxy.empty()) { + curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str()); + curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort); + + if (!d->proxyAuth.empty()) { + curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC); + curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str()); + } + } + + std::string method = boost::to_lower_copy(r->method()); + if (method == "get") { + curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1); + } else if (method == "put") { + curl_easy_setopt(curlRequest, CURLOPT_PUT, 1); + curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1); + } else if (method == "post") { + // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html + curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1); + + std::string q = r->query().substr(1); + curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str()); + + // reset URL to exclude query pieces + std::string urlWithoutQuery = r->url(); + std::string::size_type queryPos = urlWithoutQuery.find('?'); + urlWithoutQuery.resize(queryPos); + curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str()); + } else { + curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str()); + } + + struct curl_slist* headerList = NULL; + if (r->hasBodyData() && (method != "post")) { + curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1); + curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength()); + curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback); + curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get()); + std::string h = "Content-Type:" + r->bodyType(); + headerList = curl_slist_append(headerList, h.c_str()); + } + + StringMap::const_iterator it; + for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) { + std::string h = it->first + ": " + it->second; + headerList = curl_slist_append(headerList, h.c_str()); + } + + if (headerList != NULL) { + curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList); + } + + curl_multi_add_handle(d->curlMulti, curlRequest); + +// this seems premature, but we don't have a callback from Curl we could +// use to trigger when the requst is actually sent. + r->requestStart(); + +#else + if( r->url().find("http://") != 0 ) { + r->setFailure(EINVAL, "only HTTP protocol is supported"); + return; + } + + std::string host = r->host(); int port = r->port(); - if (!_proxy.empty()) { - host = _proxy; - port = _proxyPort; + if (!d->proxy.empty()) { + host = d->proxy; + port = d->proxyPort; } - - stringstream ss; + + Connection* con = NULL; + std::stringstream ss; ss << host << "-" << port; - string connectionId = ss.str(); - - if (_connections.find(connectionId) == _connections.end()) { - Connection* con = new Connection(this); + std::string connectionId = ss.str(); + bool havePending = !d->pendingRequests.empty(); + bool atConnectionsLimit = d->connections.size() >= d->maxConnections; + ConnectionDict::iterator consEnd = d->connections.end(); + + // assign request to an existing Connection. + // various options exist here, examined in order + ConnectionDict::iterator it = d->connections.find(connectionId); + if (atConnectionsLimit && (it == consEnd)) { + // maximum number of connections active, queue this request + // when a connection goes inactive, we'll start this one + d->pendingRequests.push_back(r); + return; + } + + // scan for an idle Connection to the same host (likely if we're + // retrieving multiple resources from the same host in quick succession) + // if we have pending requests (waiting for a free Connection), then + // force new requests on this id to always use the first Connection + // (instead of the random selection below). This ensures that when + // there's pressure on the number of connections to keep alive, one + // host can't DoS every other. + int count = 0; + for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) { + if (havePending || !it->second->isActive()) { + con = it->second; + break; + } + } + + bool atHostConnectionsLimit = (count >= d->maxHostConnections); + + if (!con && (atConnectionsLimit || atHostConnectionsLimit)) { + // all current connections are busy (active), and we don't + // have free connections to allocate, so let's assign to + // an existing one randomly. Ideally we'd used whichever one will + // complete first but we don't have that info. + int index = rand() % count; + for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; } + con = it->second; + } + + // allocate a new connection object + if (!con) { + static int connectionSuffx = 0; + + std::stringstream ss; + ss << connectionId << "-" << connectionSuffx++; + + SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str()); + con = new Connection(this, ss.str()); con->setServer(host, port); - _connections[connectionId] = con; + con->setMaxPipelineLength(d->maxPipelineDepth); + d->poller.addChannel(con); + d->connections.insert(d->connections.end(), + ConnectionDict::value_type(connectionId, con)); } - - _connections[connectionId]->queueRequest(r); + + SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId()); + con->queueRequest(r); +#endif +} + +void Client::cancelRequest(const Request_ptr &r, std::string reason) +{ +#if defined(ENABLE_CURL) + ClientPrivate::RequestCurlMap::iterator it = d->requests.find(r); + if(it == d->requests.end()) { + // already being removed, presumably inside ::update() + // nothing more to do + return; + } + + CURLMcode err = curl_multi_remove_handle(d->curlMulti, it->second); + assert(err == CURLM_OK); + + // clear the request pointer form the curl-easy object + curl_easy_setopt(it->second, CURLOPT_PRIVATE, 0); + + curl_easy_cleanup(it->second); + d->requests.erase(it); +#else + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ++it) { + (it->second)->cancelRequest(r); + } +#endif + r->setFailure(-1, reason); +} + +//------------------------------------------------------------------------------ +FileRequestRef Client::save( const std::string& url, + const std::string& filename ) +{ + FileRequestRef req = new FileRequest(url, filename); + makeRequest(req); + return req; +} + +//------------------------------------------------------------------------------ +MemoryRequestRef Client::load(const std::string& url) +{ + MemoryRequestRef req = new MemoryRequest(url); + makeRequest(req); + return req; } void Client::requestFinished(Connection* con) { - + +} + +void Client::setUserAgent(const std::string& ua) +{ + d->userAgent = ua; +} + +const std::string& Client::userAgent() const +{ + return d->userAgent; +} + +const std::string& Client::proxyHost() const +{ + return d->proxy; } -void Client::setUserAgent(const string& ua) +const std::string& Client::proxyAuth() const { - _userAgent = ua; + return d->proxyAuth; } -void Client::setProxy(const string& proxy, int port, const string& auth) +void Client::setProxy( const std::string& proxy, + int port, + const std::string& auth ) { - _proxy = proxy; - _proxyPort = port; - _proxyAuth = auth; + d->proxy = proxy; + d->proxyPort = port; + d->proxyAuth = auth; +} + +bool Client::hasActiveRequests() const +{ + #if defined(ENABLE_CURL) + return !d->requests.empty(); + #else + ConnectionDict::const_iterator it = d->connections.begin(); + for (; it != d->connections.end(); ++it) { + if (it->second->isActive()) return true; + } + + return false; +#endif +} + +void Client::receivedBytes(unsigned int count) +{ + d->bytesTransferred += count; + d->totalBytesDownloaded += count; +} + +unsigned int Client::transferRateBytesPerSec() const +{ + unsigned int e = d->timeTransferSample.elapsedMSec(); + if (e > 400) { + // too long a window, ignore + d->timeTransferSample.stamp(); + d->bytesTransferred = 0; + d->lastTransferRate = 0; + return 0; + } + + if (e < 100) { // avoid really narrow windows + return d->lastTransferRate; + } + + unsigned int ratio = (d->bytesTransferred * 1000) / e; + // run a low-pass filter + unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio); + smoothed /= 400; + + d->timeTransferSample.stamp(); + d->bytesTransferred = 0; + d->lastTransferRate = smoothed; + return smoothed; +} + +uint64_t Client::totalBytesDownloaded() const +{ + return d->totalBytesDownloaded; +} + +size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata) +{ + size_t byteSize = size * nmemb; + Request* req = static_cast(userdata); + req->processBodyBytes(ptr, byteSize); + + Client* cl = req->http(); + if (cl) { + cl->receivedBytes(byteSize); + } + + return byteSize; +} + +size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata) +{ + size_t maxBytes = size * nmemb; + Request* req = static_cast(userdata); + size_t actualBytes = req->getBodyData(ptr, 0, maxBytes); + return actualBytes; +} + +size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata) +{ + size_t byteSize = size * nitems; + Request* req = static_cast(userdata); + std::string h = strutils::simplify(std::string(rawBuffer, byteSize)); + + if (req->readyState() == HTTP::Request::OPENED) { + req->responseStart(h); + return byteSize; + } + + if (h.empty()) { + // got a 100-continue reponse; restart + if (req->responseCode() == 100) { + req->setReadyState(HTTP::Request::OPENED); + return byteSize; + } + + req->responseHeadersComplete(); + return byteSize; + } + + if (req->responseCode() == 100) { + return byteSize; // skip headers associated with 100-continue status + } + + size_t colonPos = h.find(':'); + if (colonPos == std::string::npos) { + SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h); + return byteSize; + } + + std::string key = strutils::simplify(h.substr(0, colonPos)); + std::string lkey = boost::to_lower_copy(key); + std::string value = strutils::strip(h.substr(colonPos + 1)); + + req->responseHeader(lkey, value); + return byteSize; +} + +void Client::debugDumpRequests() +{ +#if defined(ENABLE_CURL) + SG_LOG(SG_IO, SG_INFO, "== HTTP request dump"); + ClientPrivate::RequestCurlMap::iterator it = d->requests.begin(); + for (; it != d->requests.end(); ++it) { + SG_LOG(SG_IO, SG_INFO, "\t" << it->first->url()); + } + SG_LOG(SG_IO, SG_INFO, "=="); +#else + SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump"); + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ++it) { + it->second->debugDumpRequests(); + } + SG_LOG(SG_IO, SG_INFO, "=="); +#endif +} + +void Client::clearAllConnections() +{ +#if defined(ENABLE_CURL) + curl_multi_cleanup(d->curlMulti); + d->createCurlMulti(); +#else + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ++it) { + delete it->second; + } + d->connections.clear(); +#endif } } // of namespace HTTP