X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=simgear%2Fio%2FHTTPClient.cxx;h=d5ef1ea5108039ff2dd8e01eea27f0141d69be03;hb=72bb9f4d5d6d861902a5779381e4ebe977db1df1;hp=03f13428d965f9265cfe6ea7bd35129b51994d87;hpb=f93fead8f277403bd1759bc20d995e8935c53bc9;p=simgear.git diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index 03f13428..d5ef1ea5 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -21,6 +21,7 @@ // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. // + #include "HTTPClient.hxx" #include "HTTPFileRequest.hxx" @@ -35,8 +36,16 @@ #include #include +#include + +#if defined(ENABLE_CURL) + #include +#else + #include +#endif + #include -#include + #include #include #include @@ -59,7 +68,6 @@ namespace HTTP extern const int DEFAULT_HTTP_PORT = 80; const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded"; -const unsigned int MAX_INFLIGHT_REQUESTS = 32; class Connection; typedef std::multimap ConnectionDict; @@ -68,34 +76,61 @@ typedef std::list RequestList; class Client::ClientPrivate { public: +#if defined(ENABLE_CURL) + CURLM* curlMulti; + + void createCurlMulti() + { + curlMulti = curl_multi_init(); + // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html + // we request HTTP 1.1 pipelining + curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */); + curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections); + curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, + (long) maxPipelineDepth); + curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, + (long) maxHostConnections); + + + } + + typedef std::map RequestCurlMap; + RequestCurlMap requests; +#else + NetChannelPoller poller; +// connections by host (potentially more than one) + ConnectionDict connections; +#endif + std::string userAgent; std::string proxy; int proxyPort; std::string proxyAuth; - NetChannelPoller poller; unsigned int maxConnections; - + unsigned int maxHostConnections; + unsigned int maxPipelineDepth; + RequestList pendingRequests; - -// connections by host (potentially more than one) - ConnectionDict connections; - + SGTimeStamp timeTransferSample; unsigned int bytesTransferred; unsigned int lastTransferRate; uint64_t totalBytesDownloaded; }; - + +#if !defined(ENABLE_CURL) class Connection : public NetChat { public: - Connection(Client* pr) : + Connection(Client* pr, const std::string& conId) : client(pr), state(STATE_CLOSED), - port(DEFAULT_HTTP_PORT) + port(DEFAULT_HTTP_PORT), + _connectionId(conId), + _maxPipelineLength(255) { } - + virtual ~Connection() { } @@ -109,64 +144,100 @@ public: // force the state to GETTING_BODY, to simplify logic in // responseComplete and handleClose - state = STATE_GETTING_BODY; + setState(STATE_GETTING_BODY); responseComplete(); } - + void setServer(const std::string& h, short p) { host = h; port = p; } - + + void setMaxPipelineLength(unsigned int m) + { + _maxPipelineLength = m; + } + // socket-level errors virtual void handleError(int error) { - if (error == ENOENT) { - // name lookup failure - // we won't have an active request yet, so the logic below won't - // fire to actually call setFailure. Let's fail all of the requests + const char* errStr = strerror(error); + SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " (" + << errStr << ")"); + + debugDumpRequests(); + + if (!activeRequest) + { + // connection level failure, eg name lookup or routing + // we won't have an active request yet, so let's fail all of the + // requests since we presume it's a systematic failure for + // the host in question BOOST_FOREACH(Request_ptr req, sentRequests) { - req->setFailure(error, "hostname lookup failure"); + req->setFailure(error, errStr); } - + BOOST_FOREACH(Request_ptr req, queuedRequests) { - req->setFailure(error, "hostname lookup failure"); + req->setFailure(error, errStr); } - - // name lookup failure, abandon all requests on this connection + sentRequests.clear(); queuedRequests.clear(); } - + NetChat::handleError(error); - if (activeRequest) { - SG_LOG(SG_IO, SG_INFO, "HTTP socket error"); - activeRequest->setFailure(error, "socket error"); + if (activeRequest) { + activeRequest->setFailure(error, errStr); activeRequest = NULL; _contentDecoder.reset(); } - - state = STATE_SOCKET_ERROR; + + setState(STATE_SOCKET_ERROR); } - + + void handleTimeout() + { + handleError(ETIMEDOUT); + } + virtual void handleClose() - { + { NetChat::handleClose(); // closing of the connection from the server side when getting the body, bool canCloseState = (state == STATE_GETTING_BODY); + bool isCancelling = (state == STATE_CANCELLING); + if (canCloseState && activeRequest) { - // force state here, so responseComplete can avoid closing the + // check bodyTransferSize matches how much we actually transferred + if (bodyTransferSize > 0) { + if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) { + SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url() + << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize); + } + } + + // force state here, so responseComplete can avoid closing the // socket again - state = STATE_CLOSED; + SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url()); + setState(STATE_CLOSED); responseComplete(); } else { - if (activeRequest) { + if (state == STATE_WAITING_FOR_RESPONSE) { + SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:" + << sentRequests.front()->url()); + assert(!sentRequests.empty()); + sentRequests.front()->setFailure(500, "server closed connection unexpectedly"); + // no active request, but don't restore the front sent one + sentRequests.erase(sentRequests.begin()); + } + + if (activeRequest && !isCancelling) { activeRequest->setFailure(500, "server closed connection"); - // remove the failed request from sentRequests, so it does + // remove the failed request from sentRequests, so it does // not get restored - RequestList::iterator it = std::find(sentRequests.begin(), + RequestList::iterator it = std::find(sentRequests.begin(), sentRequests.end(), activeRequest); if (it != sentRequests.end()) { sentRequests.erase(it); @@ -174,49 +245,75 @@ public: activeRequest = NULL; _contentDecoder.reset(); } - - state = STATE_CLOSED; + + setState(STATE_CLOSED); } - + if (sentRequests.empty()) { return; } - + // restore sent requests to the queue, so they will be re-sent // when the connection opens again queuedRequests.insert(queuedRequests.begin(), sentRequests.begin(), sentRequests.end()); sentRequests.clear(); } - - void handleTimeout() - { - NetChat::handleError(ETIMEDOUT); - if (activeRequest) { - SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout"); - activeRequest->setFailure(ETIMEDOUT, "socket timeout"); - activeRequest = NULL; - _contentDecoder.reset(); - } - - state = STATE_SOCKET_ERROR; - } - + void queueRequest(const Request_ptr& r) { queuedRequests.push_back(r); tryStartNextRequest(); } - + + void cancelRequest(const Request_ptr& r) + { + RequestList::iterator it = std::find(sentRequests.begin(), + sentRequests.end(), r); + if (it != sentRequests.end()) { + sentRequests.erase(it); + + if ((r == activeRequest) || !activeRequest) { + // either the cancelling request is active, or we're in waiting + // for response state - close now + setState(STATE_CANCELLING); + close(); + + setState(STATE_CLOSED); + activeRequest = NULL; + _contentDecoder.reset(); + } else if (activeRequest) { + SG_LOG(SG_IO, SG_INFO, "con:" << _connectionId << " cancelling non-active: " << r->url()); + + // has been sent but not active, let the active finish and + // then close. Otherwise cancelling request #2 would mess up + // active transfer #1 + activeRequest->setCloseAfterComplete(); + } + } // of request has been sent + + // simpler case, not sent yet just remove from the queue + it = std::find(queuedRequests.begin(), queuedRequests.end(), r); + if (it != queuedRequests.end()) { + queuedRequests.erase(it); + } + } + void beginResponse() { assert(!sentRequests.empty()); assert(state == STATE_WAITING_FOR_RESPONSE); - + activeRequest = sentRequests.front(); - - activeRequest->responseStart(buffer); - state = STATE_GETTING_HEADERS; + try { + SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url()); + activeRequest->responseStart(buffer); + } catch (sg_exception& e) { + handleError(EIO); + return; + } + + setState(STATE_GETTING_HEADERS); buffer.clear(); if (activeRequest->responseCode() == 204) { noMessageBody = true; @@ -230,7 +327,7 @@ public: chunkedTransfer = false; _contentDecoder.reset(); } - + void tryStartNextRequest() { while( !queuedRequests.empty() @@ -241,20 +338,22 @@ public: idleTime.stamp(); return; } - - if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) { + + if (sentRequests.size() >= _maxPipelineLength) { return; } - + if (state == STATE_CLOSED) { if (!connectToHost()) { + setState(STATE_SOCKET_ERROR); return; } - + + SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected."); setTerminator("\r\n"); - state = STATE_IDLE; + setState(STATE_IDLE); } - + Request_ptr r = queuedRequests.front(); r->requestStart(); @@ -263,7 +362,7 @@ public: assert(!path.empty()); std::string query = r->query(); std::string bodyData; - + if (!client->proxyHost().empty()) { path = r->scheme() + "://" + r->host() + r->path(); } @@ -281,7 +380,7 @@ public: headerData << "Content-Type:" << r->bodyType() << "\r\n"; } } - + headerData << "Host: " << r->hostAndPort() << "\r\n"; headerData << "User-Agent:" << client->userAgent() << "\r\n"; headerData << "Accept-Encoding: deflate, gzip\r\n"; @@ -297,7 +396,7 @@ public: if (!bodyData.empty()) { headerData << bodyData; } - + bool ok = push(headerData.str().c_str()); if (!ok) { SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket"); @@ -331,19 +430,19 @@ public: break; } } - - // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() << - // "\n\t @ " << reinterpret_cast(r.ptr()) << - // "\n\t on connection " << this); + + SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url()); // successfully sent, remove from queue, and maybe send the next queuedRequests.pop_front(); sentRequests.push_back(r); - state = STATE_WAITING_FOR_RESPONSE; - + if (state == STATE_IDLE) { + setState(STATE_WAITING_FOR_RESPONSE); + } + // pipelining, let's maybe send the next request right away tryStartNextRequest(); } - + virtual void collectIncomingData(const char* s, int n) { idleTime.stamp(); @@ -363,91 +462,140 @@ public: case STATE_WAITING_FOR_RESPONSE: beginResponse(); break; - + case STATE_GETTING_HEADERS: processHeader(); buffer.clear(); break; - + case STATE_GETTING_BODY: responseComplete(); break; - + case STATE_GETTING_CHUNKED: processChunkHeader(); break; - + case STATE_GETTING_CHUNKED_BYTES: setTerminator("\r\n"); - state = STATE_GETTING_CHUNKED; + setState(STATE_GETTING_CHUNKED); buffer.clear(); break; - + case STATE_GETTING_TRAILER: processTrailer(); buffer.clear(); break; - + case STATE_IDLE: SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?"); - + default: break; } } - + bool hasIdleTimeout() const { - if (state != STATE_IDLE) { + if ((state != STATE_IDLE) && (state != STATE_CLOSED)) { return false; } - + assert(sentRequests.empty()); - return idleTime.elapsedMSec() > 1000 * 10; // ten seconds + bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds + return isTimedOut; } - + bool hasErrorTimeout() const { - if (state == STATE_IDLE) { + if ((state == STATE_IDLE) || (state == STATE_CLOSED)) { return false; } - - return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds + + bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds + return isTimedOut; } - + bool hasError() const { return (state == STATE_SOCKET_ERROR); } - + bool shouldStartNext() const { - return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS); + return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength); } - + bool isActive() const { return !queuedRequests.empty() || !sentRequests.empty(); } + + std::string connectionId() const + { + return _connectionId; + } + + void debugDumpRequests() const + { + SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId + << "; state=" << state << ")"); + if (activeRequest) { + SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url()); + } else { + SG_LOG(SG_IO, SG_DEBUG, "\tNo active request"); + } + + BOOST_FOREACH(Request_ptr req, sentRequests) { + SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url()); + } + + BOOST_FOREACH(Request_ptr req, queuedRequests) { + SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url()); + } + } private: + enum ConnectionState { + STATE_IDLE = 0, + STATE_WAITING_FOR_RESPONSE, + STATE_GETTING_HEADERS, + STATE_GETTING_BODY, + STATE_GETTING_CHUNKED, + STATE_GETTING_CHUNKED_BYTES, + STATE_GETTING_TRAILER, + STATE_SOCKET_ERROR, + STATE_CANCELLING, ///< cancelling an acitve request + STATE_CLOSED ///< connection should be closed now + }; + + void setState(ConnectionState newState) + { + if (state == newState) { + return; + } + + state = newState; + } + bool connectToHost() { SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port); - + if (!open()) { - SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed"); + SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed"); return false; } - + if (connect(host.c_str(), port) != 0) { + SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed"); return false; } - + return true; } - - + + void processHeader() { std::string h = strutils::simplify(buffer); @@ -455,18 +603,18 @@ private: headersComplete(); return; } - + int colonPos = buffer.find(':'); if (colonPos < 0) { SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h); return; } - + std::string key = strutils::simplify(buffer.substr(0, colonPos)); std::string lkey = boost::to_lower_copy(key); std::string value = strutils::strip(buffer.substr(colonPos + 1)); - - // only consider these if getting headers (as opposed to trailers + + // only consider these if getting headers (as opposed to trailers // of a chunked transfer) if (state == STATE_GETTING_HEADERS) { if (lkey == "content-length") { @@ -484,10 +632,10 @@ private: _contentDecoder.setEncoding(value); } } - + activeRequest->responseHeader(lkey, value); } - + void processTransferEncoding(const std::string& te) { if (te == "chunked") { @@ -497,14 +645,14 @@ private: // failure } } - + void processChunkHeader() { if (buffer.empty()) { // blank line after chunk data return; } - + int chunkSize = 0; int semiPos = buffer.find(';'); if (semiPos >= 0) { @@ -513,93 +661,86 @@ private: } else { chunkSize = strutils::to_int(buffer, 16); } - + buffer.clear(); if (chunkSize == 0) { // trailer start - state = STATE_GETTING_TRAILER; + setState(STATE_GETTING_TRAILER); return; } - - state = STATE_GETTING_CHUNKED_BYTES; + + setState(STATE_GETTING_CHUNKED_BYTES); setByteCount(chunkSize); } - + void processTrailer() - { + { if (buffer.empty()) { // end of trailers responseComplete(); return; } - + // process as a normal header processHeader(); } - + void headersComplete() { activeRequest->responseHeadersComplete(); _contentDecoder.initWithRequest(activeRequest); - + + if (!activeRequest->serverSupportsPipelining()) { + SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it"); + _maxPipelineLength = 1; + } + if (chunkedTransfer) { - state = STATE_GETTING_CHUNKED; + setState(STATE_GETTING_CHUNKED); } else if (noMessageBody || (bodyTransferSize == 0)) { // force the state to GETTING_BODY, to simplify logic in // responseComplete and handleClose - state = STATE_GETTING_BODY; + setState(STATE_GETTING_BODY); responseComplete(); } else { setByteCount(bodyTransferSize); // may be -1, that's fine - state = STATE_GETTING_BODY; + setState(STATE_GETTING_BODY); } } - + void responseComplete() { Request_ptr completedRequest = activeRequest; _contentDecoder.finish(); - + assert(sentRequests.front() == activeRequest); sentRequests.pop_front(); bool doClose = activeRequest->closeAfterComplete(); activeRequest = NULL; - + if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) { if (doClose) { + SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested"); // this will bring us into handleClose() above, which updates // state to STATE_CLOSED close(); - + // if we have additional requests waiting, try to start them now tryStartNextRequest(); } } - + if (state != STATE_CLOSED) { - state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE; + setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE); } - + // notify request after we change state, so this connection is idle // if completion triggers other requests (which is likely) - // SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url()); completedRequest->responseComplete(); client->requestFinished(this); - + setTerminator("\r\n"); } - - enum ConnectionState { - STATE_IDLE = 0, - STATE_WAITING_FOR_RESPONSE, - STATE_GETTING_HEADERS, - STATE_GETTING_BODY, - STATE_GETTING_CHUNKED, - STATE_GETTING_CHUNKED_BYTES, - STATE_GETTING_TRAILER, - STATE_SOCKET_ERROR, - STATE_CLOSED ///< connection should be closed now - }; - + Client* client; Request_ptr activeRequest; ConnectionState state; @@ -610,48 +751,132 @@ private: SGTimeStamp idleTime; bool chunkedTransfer; bool noMessageBody; - + RequestList queuedRequests; RequestList sentRequests; - + ContentDecoder _contentDecoder; + std::string _connectionId; + unsigned int _maxPipelineLength; }; +#endif // of !ENABLE_CURL Client::Client() : d(new ClientPrivate) { d->proxyPort = 0; d->maxConnections = 4; + d->maxHostConnections = 4; d->bytesTransferred = 0; d->lastTransferRate = 0; d->timeTransferSample.stamp(); d->totalBytesDownloaded = 0; - + d->maxPipelineDepth = 5; setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION)); +#if defined(ENABLE_CURL) + static bool didInitCurlGlobal = false; + if (!didInitCurlGlobal) { + curl_global_init(CURL_GLOBAL_ALL); + didInitCurlGlobal = true; + } + + d->createCurlMulti(); +#endif } Client::~Client() { +#if defined(ENABLE_CURL) + curl_multi_cleanup(d->curlMulti); +#endif } void Client::setMaxConnections(unsigned int maxCon) { - if (maxCon < 1) { - throw sg_range_exception("illegal HTTP::Client::setMaxConnections value"); - } - d->maxConnections = maxCon; +#if defined(ENABLE_CURL) + curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon); +#endif +} + +void Client::setMaxHostConnections(unsigned int maxHostCon) +{ + d->maxHostConnections = maxHostCon; +#if defined(ENABLE_CURL) + curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon); +#endif +} + +void Client::setMaxPipelineDepth(unsigned int depth) +{ + d->maxPipelineDepth = depth; +#if defined(ENABLE_CURL) + curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth); +#else + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ) { + it->second->setMaxPipelineLength(depth); + } +#endif } void Client::update(int waitTimeout) { - d->poller.poll(waitTimeout); +#if defined(ENABLE_CURL) + int remainingActive, messagesInQueue; + curl_multi_perform(d->curlMulti, &remainingActive); + + CURLMsg* msg; + while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) { + if (msg->msg == CURLMSG_DONE) { + Request* rawReq = 0; + CURL *e = msg->easy_handle; + curl_easy_getinfo(e, CURLINFO_PRIVATE, &rawReq); + + // ensure request stays valid for the moment + // eg if responseComplete cancels us + Request_ptr req(rawReq); + + long responseCode; + curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode); + + // remove from the requests map now, + // in case the callbacks perform a cancel. We'll use + // the absence from the request dict in cancel to avoid + // a double remove + ClientPrivate::RequestCurlMap::iterator it = d->requests.find(req); + assert(it != d->requests.end()); + assert(it->second == e); + d->requests.erase(it); + + if (msg->data.result == 0) { + req->responseComplete(); + } else { + SG_LOG(SG_IO, SG_WARN, "CURL Result:" << msg->data.result << " " << curl_easy_strerror(msg->data.result)); + req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result)); + } + + curl_multi_remove_handle(d->curlMulti, e); + curl_easy_cleanup(e); + } else { + // should never happen since CURLMSG_DONE is the only code + // defined! + SG_LOG(SG_IO, SG_ALERT, "unknown CurlMSG:" << msg->msg); + } + } // of curl message processing loop + SGTimeStamp::sleepForMSec(waitTimeout); +#else + if (!d->poller.hasChannels() && (waitTimeout > 0)) { + SGTimeStamp::sleepForMSec(waitTimeout); + } else { + d->poller.poll(waitTimeout); + } + bool waitingRequests = !d->pendingRequests.empty(); - ConnectionDict::iterator it = d->connections.begin(); for (; it != d->connections.end(); ) { Connection* con = it->second; - if (con->hasIdleTimeout() || + if (con->hasIdleTimeout() || con->hasError() || con->hasErrorTimeout() || (!con->isActive() && waitingRequests)) @@ -660,7 +885,7 @@ void Client::update(int waitTimeout) // tell the connection we're timing it out con->handleTimeout(); } - + // connection has been idle for a while, clean it up // (or if we have requests waiting for a different host, // or an error condition @@ -674,11 +899,11 @@ void Client::update(int waitTimeout) ++it; } } // of connection iteration - + if (waitingRequests && (d->connections.size() < d->maxConnections)) { RequestList waiting(d->pendingRequests); d->pendingRequests.clear(); - + // re-submit all waiting requests in order; this takes care of // finding multiple pending items targetted to the same (new) // connection @@ -686,6 +911,7 @@ void Client::update(int waitTimeout) makeRequest(req); } } +#endif } void Client::makeRequest(const Request_ptr& r) @@ -698,18 +924,100 @@ void Client::makeRequest(const Request_ptr& r) return; } + r->_client = this; + +#if defined(ENABLE_CURL) + ClientPrivate::RequestCurlMap::iterator rit = d->requests.find(r); + assert(rit == d->requests.end()); + + CURL* curlRequest = curl_easy_init(); + curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str()); + + d->requests[r] = curlRequest; + + curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get()); + // disable built-in libCurl progress feedback + curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1); + + curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback); + curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get()); + curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback); + curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get()); + + curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str()); + curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); + + if (!d->proxy.empty()) { + curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str()); + curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort); + + if (!d->proxyAuth.empty()) { + curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC); + curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str()); + } + } + + std::string method = boost::to_lower_copy(r->method()); + if (method == "get") { + curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1); + } else if (method == "put") { + curl_easy_setopt(curlRequest, CURLOPT_PUT, 1); + curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1); + } else if (method == "post") { + // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html + curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1); + + std::string q = r->query().substr(1); + curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str()); + + // reset URL to exclude query pieces + std::string urlWithoutQuery = r->url(); + std::string::size_type queryPos = urlWithoutQuery.find('?'); + urlWithoutQuery.resize(queryPos); + curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str()); + } else { + curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str()); + } + + struct curl_slist* headerList = NULL; + if (r->hasBodyData() && (method != "post")) { + curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1); + curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength()); + curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback); + curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get()); + std::string h = "Content-Type:" + r->bodyType(); + headerList = curl_slist_append(headerList, h.c_str()); + } + + StringMap::const_iterator it; + for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) { + std::string h = it->first + ": " + it->second; + headerList = curl_slist_append(headerList, h.c_str()); + } + + if (headerList != NULL) { + curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList); + } + + curl_multi_add_handle(d->curlMulti, curlRequest); + +// this seems premature, but we don't have a callback from Curl we could +// use to trigger when the requst is actually sent. + r->requestStart(); + +#else if( r->url().find("http://") != 0 ) { r->setFailure(EINVAL, "only HTTP protocol is supported"); return; } - + std::string host = r->host(); int port = r->port(); if (!d->proxy.empty()) { host = d->proxy; port = d->proxyPort; } - + Connection* con = NULL; std::stringstream ss; ss << host << "-" << port; @@ -717,17 +1025,17 @@ void Client::makeRequest(const Request_ptr& r) bool havePending = !d->pendingRequests.empty(); bool atConnectionsLimit = d->connections.size() >= d->maxConnections; ConnectionDict::iterator consEnd = d->connections.end(); - + // assign request to an existing Connection. // various options exist here, examined in order ConnectionDict::iterator it = d->connections.find(connectionId); if (atConnectionsLimit && (it == consEnd)) { // maximum number of connections active, queue this request - // when a connection goes inactive, we'll start this one + // when a connection goes inactive, we'll start this one d->pendingRequests.push_back(r); return; } - + // scan for an idle Connection to the same host (likely if we're // retrieving multiple resources from the same host in quick succession) // if we have pending requests (waiting for a free Connection), then @@ -742,32 +1050,70 @@ void Client::makeRequest(const Request_ptr& r) break; } } - - if (!con && atConnectionsLimit) { + + bool atHostConnectionsLimit = (count >= d->maxHostConnections); + + if (!con && (atConnectionsLimit || atHostConnectionsLimit)) { // all current connections are busy (active), and we don't // have free connections to allocate, so let's assign to // an existing one randomly. Ideally we'd used whichever one will // complete first but we don't have that info. int index = rand() % count; - for (it = d->connections.find(connectionId); index > 0; --index) { ; } + for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; } con = it->second; } - + // allocate a new connection object if (!con) { - con = new Connection(this); + static int connectionSuffx = 0; + + std::stringstream ss; + ss << connectionId << "-" << connectionSuffx++; + + SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str()); + con = new Connection(this, ss.str()); con->setServer(host, port); + con->setMaxPipelineLength(d->maxPipelineDepth); d->poller.addChannel(con); - d->connections.insert(d->connections.end(), + d->connections.insert(d->connections.end(), ConnectionDict::value_type(connectionId, con)); } - + + SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId()); con->queueRequest(r); +#endif +} + +void Client::cancelRequest(const Request_ptr &r, std::string reason) +{ +#if defined(ENABLE_CURL) + ClientPrivate::RequestCurlMap::iterator it = d->requests.find(r); + if(it == d->requests.end()) { + // already being removed, presumably inside ::update() + // nothing more to do + return; + } + + CURLMcode err = curl_multi_remove_handle(d->curlMulti, it->second); + assert(err == CURLM_OK); + + // clear the request pointer form the curl-easy object + curl_easy_setopt(it->second, CURLOPT_PRIVATE, 0); + + curl_easy_cleanup(it->second); + d->requests.erase(it); +#else + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ++it) { + (it->second)->cancelRequest(r); + } +#endif + r->setFailure(-1, reason); } //------------------------------------------------------------------------------ -FileRequestRef Client::urlretrieve( const std::string& url, - const std::string& filename ) +FileRequestRef Client::save( const std::string& url, + const std::string& filename ) { FileRequestRef req = new FileRequest(url, filename); makeRequest(req); @@ -775,7 +1121,7 @@ FileRequestRef Client::urlretrieve( const std::string& url, } //------------------------------------------------------------------------------ -MemoryRequestRef Client::urlload(const std::string& url) +MemoryRequestRef Client::load(const std::string& url) { MemoryRequestRef req = new MemoryRequest(url); makeRequest(req); @@ -784,7 +1130,7 @@ MemoryRequestRef Client::urlload(const std::string& url) void Client::requestFinished(Connection* con) { - + } void Client::setUserAgent(const std::string& ua) @@ -796,12 +1142,12 @@ const std::string& Client::userAgent() const { return d->userAgent; } - + const std::string& Client::proxyHost() const { return d->proxy; } - + const std::string& Client::proxyAuth() const { return d->proxyAuth; @@ -818,12 +1164,16 @@ void Client::setProxy( const std::string& proxy, bool Client::hasActiveRequests() const { + #if defined(ENABLE_CURL) + return !d->requests.empty(); + #else ConnectionDict::const_iterator it = d->connections.begin(); for (; it != d->connections.end(); ++it) { if (it->second->isActive()) return true; } - + return false; +#endif } void Client::receivedBytes(unsigned int count) @@ -831,7 +1181,7 @@ void Client::receivedBytes(unsigned int count) d->bytesTransferred += count; d->totalBytesDownloaded += count; } - + unsigned int Client::transferRateBytesPerSec() const { unsigned int e = d->timeTransferSample.elapsedMSec(); @@ -842,16 +1192,16 @@ unsigned int Client::transferRateBytesPerSec() const d->lastTransferRate = 0; return 0; } - + if (e < 100) { // avoid really narrow windows return d->lastTransferRate; } - + unsigned int ratio = (d->bytesTransferred * 1000) / e; // run a low-pass filter unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio); smoothed /= 400; - + d->timeTransferSample.stamp(); d->bytesTransferred = 0; d->lastTransferRate = smoothed; @@ -863,6 +1213,101 @@ uint64_t Client::totalBytesDownloaded() const return d->totalBytesDownloaded; } +size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata) +{ + size_t byteSize = size * nmemb; + Request* req = static_cast(userdata); + req->processBodyBytes(ptr, byteSize); + + Client* cl = req->http(); + if (cl) { + cl->receivedBytes(byteSize); + } + + return byteSize; +} + +size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata) +{ + size_t maxBytes = size * nmemb; + Request* req = static_cast(userdata); + size_t actualBytes = req->getBodyData(ptr, 0, maxBytes); + return actualBytes; +} + +size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata) +{ + size_t byteSize = size * nitems; + Request* req = static_cast(userdata); + std::string h = strutils::simplify(std::string(rawBuffer, byteSize)); + + if (req->readyState() == HTTP::Request::OPENED) { + req->responseStart(h); + return byteSize; + } + + if (h.empty()) { + // got a 100-continue reponse; restart + if (req->responseCode() == 100) { + req->setReadyState(HTTP::Request::OPENED); + return byteSize; + } + + req->responseHeadersComplete(); + return byteSize; + } + + if (req->responseCode() == 100) { + return byteSize; // skip headers associated with 100-continue status + } + + size_t colonPos = h.find(':'); + if (colonPos == std::string::npos) { + SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h); + return byteSize; + } + + std::string key = strutils::simplify(h.substr(0, colonPos)); + std::string lkey = boost::to_lower_copy(key); + std::string value = strutils::strip(h.substr(colonPos + 1)); + + req->responseHeader(lkey, value); + return byteSize; +} + +void Client::debugDumpRequests() +{ +#if defined(ENABLE_CURL) + SG_LOG(SG_IO, SG_INFO, "== HTTP request dump"); + ClientPrivate::RequestCurlMap::iterator it = d->requests.begin(); + for (; it != d->requests.end(); ++it) { + SG_LOG(SG_IO, SG_INFO, "\t" << it->first->url()); + } + SG_LOG(SG_IO, SG_INFO, "=="); +#else + SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump"); + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ++it) { + it->second->debugDumpRequests(); + } + SG_LOG(SG_IO, SG_INFO, "=="); +#endif +} + +void Client::clearAllConnections() +{ +#if defined(ENABLE_CURL) + curl_multi_cleanup(d->curlMulti); + d->createCurlMulti(); +#else + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ++it) { + delete it->second; + } + d->connections.clear(); +#endif +} + } // of namespace HTTP } // of namespace simgear