2 * \file HTTPClient.cxx - simple HTTP client engine for SimHear
5 // Written by James Turner
7 // Copyright (C) 2013 James Turner <zakalawe@mac.com>
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Library General Public License for more details.
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
30 #include <cstdlib> // rand()
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
39 #include <simgear/simgear_config.h>
41 #if defined(ENABLE_CURL)
42 #include <curl/multi.h>
44 #include <simgear/io/HTTPContentDecode.hxx>
47 #include <simgear/io/sg_netChat.hxx>
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
58 # if !defined(SIMGEAR_VERSION)
59 # define SIMGEAR_VERSION "simgear-development"
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
73 typedef std::multimap<std::string, Connection*> ConnectionDict;
74 typedef std::list<Request_ptr> RequestList;
76 class Client::ClientPrivate
79 #if defined(ENABLE_CURL)
82 void createCurlMulti()
84 curlMulti = curl_multi_init();
85 // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
86 // we request HTTP 1.1 pipelining
87 curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
88 curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
89 curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
90 (long) maxPipelineDepth);
91 curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
92 (long) maxHostConnections);
97 typedef std::map<Request_ptr, CURL*> RequestCurlMap;
98 RequestCurlMap requests;
100 NetChannelPoller poller;
101 // connections by host (potentially more than one)
102 ConnectionDict connections;
105 std::string userAgent;
108 std::string proxyAuth;
109 unsigned int maxConnections;
110 unsigned int maxHostConnections;
111 unsigned int maxPipelineDepth;
113 RequestList pendingRequests;
115 SGTimeStamp timeTransferSample;
116 unsigned int bytesTransferred;
117 unsigned int lastTransferRate;
118 uint64_t totalBytesDownloaded;
121 #if !defined(ENABLE_CURL)
122 class Connection : public NetChat
125 Connection(Client* pr, const std::string& conId) :
128 port(DEFAULT_HTTP_PORT),
129 _connectionId(conId),
130 _maxPipelineLength(255)
134 virtual ~Connection()
138 virtual void handleBufferRead (NetBuffer& buffer)
140 if( !activeRequest || !activeRequest->isComplete() )
141 return NetChat::handleBufferRead(buffer);
143 // Request should be aborted (signaled by setting its state to complete).
145 // force the state to GETTING_BODY, to simplify logic in
146 // responseComplete and handleClose
147 setState(STATE_GETTING_BODY);
151 void setServer(const std::string& h, short p)
157 void setMaxPipelineLength(unsigned int m)
159 _maxPipelineLength = m;
162 // socket-level errors
163 virtual void handleError(int error)
165 const char* errStr = strerror(error);
166 SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
173 // connection level failure, eg name lookup or routing
174 // we won't have an active request yet, so let's fail all of the
175 // requests since we presume it's a systematic failure for
176 // the host in question
177 BOOST_FOREACH(Request_ptr req, sentRequests) {
178 req->setFailure(error, errStr);
181 BOOST_FOREACH(Request_ptr req, queuedRequests) {
182 req->setFailure(error, errStr);
185 sentRequests.clear();
186 queuedRequests.clear();
189 NetChat::handleError(error);
191 activeRequest->setFailure(error, errStr);
192 activeRequest = NULL;
193 _contentDecoder.reset();
196 setState(STATE_SOCKET_ERROR);
201 handleError(ETIMEDOUT);
204 virtual void handleClose()
206 NetChat::handleClose();
208 // closing of the connection from the server side when getting the body,
209 bool canCloseState = (state == STATE_GETTING_BODY);
210 bool isCancelling = (state == STATE_CANCELLING);
212 if (canCloseState && activeRequest) {
213 // check bodyTransferSize matches how much we actually transferred
214 if (bodyTransferSize > 0) {
215 if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
216 SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
217 << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
221 // force state here, so responseComplete can avoid closing the
223 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
224 setState(STATE_CLOSED);
227 if (state == STATE_WAITING_FOR_RESPONSE) {
228 SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
229 << sentRequests.front()->url());
230 assert(!sentRequests.empty());
231 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
232 // no active request, but don't restore the front sent one
233 sentRequests.erase(sentRequests.begin());
236 if (activeRequest && !isCancelling) {
237 activeRequest->setFailure(500, "server closed connection");
238 // remove the failed request from sentRequests, so it does
240 RequestList::iterator it = std::find(sentRequests.begin(),
241 sentRequests.end(), activeRequest);
242 if (it != sentRequests.end()) {
243 sentRequests.erase(it);
245 activeRequest = NULL;
246 _contentDecoder.reset();
249 setState(STATE_CLOSED);
252 if (sentRequests.empty()) {
256 // restore sent requests to the queue, so they will be re-sent
257 // when the connection opens again
258 queuedRequests.insert(queuedRequests.begin(),
259 sentRequests.begin(), sentRequests.end());
260 sentRequests.clear();
263 void queueRequest(const Request_ptr& r)
265 queuedRequests.push_back(r);
266 tryStartNextRequest();
269 void cancelRequest(const Request_ptr& r)
271 RequestList::iterator it = std::find(sentRequests.begin(),
272 sentRequests.end(), r);
273 if (it != sentRequests.end()) {
274 sentRequests.erase(it);
276 if ((r == activeRequest) || !activeRequest) {
277 // either the cancelling request is active, or we're in waiting
278 // for response state - close now
279 setState(STATE_CANCELLING);
282 setState(STATE_CLOSED);
283 activeRequest = NULL;
284 _contentDecoder.reset();
285 } else if (activeRequest) {
286 SG_LOG(SG_IO, SG_INFO, "con:" << _connectionId << " cancelling non-active: " << r->url());
288 // has been sent but not active, let the active finish and
289 // then close. Otherwise cancelling request #2 would mess up
290 // active transfer #1
291 activeRequest->setCloseAfterComplete();
293 } // of request has been sent
295 // simpler case, not sent yet just remove from the queue
296 it = std::find(queuedRequests.begin(), queuedRequests.end(), r);
297 if (it != queuedRequests.end()) {
298 queuedRequests.erase(it);
304 assert(!sentRequests.empty());
305 assert(state == STATE_WAITING_FOR_RESPONSE);
307 activeRequest = sentRequests.front();
309 SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
310 activeRequest->responseStart(buffer);
311 } catch (sg_exception& e) {
316 setState(STATE_GETTING_HEADERS);
318 if (activeRequest->responseCode() == 204) {
319 noMessageBody = true;
320 } else if (activeRequest->method() == "HEAD") {
321 noMessageBody = true;
323 noMessageBody = false;
326 bodyTransferSize = -1;
327 chunkedTransfer = false;
328 _contentDecoder.reset();
331 void tryStartNextRequest()
333 while( !queuedRequests.empty()
334 && queuedRequests.front()->isComplete() )
335 queuedRequests.pop_front();
337 if (queuedRequests.empty()) {
342 if (sentRequests.size() >= _maxPipelineLength) {
346 if (state == STATE_CLOSED) {
347 if (!connectToHost()) {
348 setState(STATE_SOCKET_ERROR);
352 SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
353 setTerminator("\r\n");
354 setState(STATE_IDLE);
357 Request_ptr r = queuedRequests.front();
360 std::stringstream headerData;
361 std::string path = r->path();
362 assert(!path.empty());
363 std::string query = r->query();
364 std::string bodyData;
366 if (!client->proxyHost().empty()) {
367 path = r->scheme() + "://" + r->host() + r->path();
370 if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
371 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
372 bodyData = query.substr(1); // URL-encode, drop the leading '?'
373 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
374 headerData << "Content-Length:" << bodyData.size() << "\r\n";
376 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
377 if( r->hasBodyData() )
379 headerData << "Content-Length:" << r->bodyLength() << "\r\n";
380 headerData << "Content-Type:" << r->bodyType() << "\r\n";
384 headerData << "Host: " << r->hostAndPort() << "\r\n";
385 headerData << "User-Agent:" << client->userAgent() << "\r\n";
386 headerData << "Accept-Encoding: deflate, gzip\r\n";
387 if (!client->proxyAuth().empty()) {
388 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
391 BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
392 headerData << h.first << ": " << h.second << "\r\n";
395 headerData << "\r\n"; // final CRLF to terminate the headers
396 if (!bodyData.empty()) {
397 headerData << bodyData;
400 bool ok = push(headerData.str().c_str());
402 SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
403 // we've over-stuffed the socket, give up for now, let things
404 // drain down before trying to start any more requests.
408 if( r->hasBodyData() )
409 for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
412 size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
415 if( !bufferSend(buf, len) )
419 "overflow the HTTP::Connection output buffer");
420 state = STATE_SOCKET_ERROR;
423 body_bytes_sent += len;
429 "HTTP asynchronous request body generation is unsupported");
434 SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
435 // successfully sent, remove from queue, and maybe send the next
436 queuedRequests.pop_front();
437 sentRequests.push_back(r);
438 if (state == STATE_IDLE) {
439 setState(STATE_WAITING_FOR_RESPONSE);
442 // pipelining, let's maybe send the next request right away
443 tryStartNextRequest();
446 virtual void collectIncomingData(const char* s, int n)
449 client->receivedBytes(static_cast<unsigned int>(n));
451 if( (state == STATE_GETTING_BODY)
452 || (state == STATE_GETTING_CHUNKED_BYTES) )
453 _contentDecoder.receivedBytes(s, n);
458 virtual void foundTerminator(void)
462 case STATE_WAITING_FOR_RESPONSE:
466 case STATE_GETTING_HEADERS:
471 case STATE_GETTING_BODY:
475 case STATE_GETTING_CHUNKED:
476 processChunkHeader();
479 case STATE_GETTING_CHUNKED_BYTES:
480 setTerminator("\r\n");
481 setState(STATE_GETTING_CHUNKED);
486 case STATE_GETTING_TRAILER:
492 SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
499 bool hasIdleTimeout() const
501 if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
505 assert(sentRequests.empty());
506 bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
510 bool hasErrorTimeout() const
512 if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
516 bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
520 bool hasError() const
522 return (state == STATE_SOCKET_ERROR);
525 bool shouldStartNext() const
527 return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
530 bool isActive() const
532 return !queuedRequests.empty() || !sentRequests.empty();
535 std::string connectionId() const
537 return _connectionId;
540 void debugDumpRequests() const
542 SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
543 << "; state=" << state << ")");
545 SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
547 SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
550 BOOST_FOREACH(Request_ptr req, sentRequests) {
551 SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
554 BOOST_FOREACH(Request_ptr req, queuedRequests) {
555 SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
559 enum ConnectionState {
561 STATE_WAITING_FOR_RESPONSE,
562 STATE_GETTING_HEADERS,
564 STATE_GETTING_CHUNKED,
565 STATE_GETTING_CHUNKED_BYTES,
566 STATE_GETTING_TRAILER,
568 STATE_CANCELLING, ///< cancelling an acitve request
569 STATE_CLOSED ///< connection should be closed now
572 void setState(ConnectionState newState)
574 if (state == newState) {
583 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
586 SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
590 if (connect(host.c_str(), port) != 0) {
591 SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
601 std::string h = strutils::simplify(buffer);
602 if (h.empty()) { // blank line terminates headers
607 int colonPos = buffer.find(':');
609 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
613 std::string key = strutils::simplify(buffer.substr(0, colonPos));
614 std::string lkey = boost::to_lower_copy(key);
615 std::string value = strutils::strip(buffer.substr(colonPos + 1));
617 // only consider these if getting headers (as opposed to trailers
618 // of a chunked transfer)
619 if (state == STATE_GETTING_HEADERS) {
620 if (lkey == "content-length") {
622 int sz = strutils::to_int(value);
623 if (bodyTransferSize <= 0) {
624 bodyTransferSize = sz;
626 activeRequest->setResponseLength(sz);
627 } else if (lkey == "transfer-length") {
628 bodyTransferSize = strutils::to_int(value);
629 } else if (lkey == "transfer-encoding") {
630 processTransferEncoding(value);
631 } else if (lkey == "content-encoding") {
632 _contentDecoder.setEncoding(value);
636 activeRequest->responseHeader(lkey, value);
639 void processTransferEncoding(const std::string& te)
641 if (te == "chunked") {
642 chunkedTransfer = true;
644 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
649 void processChunkHeader()
651 if (buffer.empty()) {
652 // blank line after chunk data
657 int semiPos = buffer.find(';');
659 // extensions ignored for the moment
660 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
662 chunkSize = strutils::to_int(buffer, 16);
666 if (chunkSize == 0) { // trailer start
667 setState(STATE_GETTING_TRAILER);
671 setState(STATE_GETTING_CHUNKED_BYTES);
672 setByteCount(chunkSize);
675 void processTrailer()
677 if (buffer.empty()) {
683 // process as a normal header
687 void headersComplete()
689 activeRequest->responseHeadersComplete();
690 _contentDecoder.initWithRequest(activeRequest);
692 if (!activeRequest->serverSupportsPipelining()) {
693 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it");
694 _maxPipelineLength = 1;
697 if (chunkedTransfer) {
698 setState(STATE_GETTING_CHUNKED);
699 } else if (noMessageBody || (bodyTransferSize == 0)) {
700 // force the state to GETTING_BODY, to simplify logic in
701 // responseComplete and handleClose
702 setState(STATE_GETTING_BODY);
705 setByteCount(bodyTransferSize); // may be -1, that's fine
706 setState(STATE_GETTING_BODY);
710 void responseComplete()
712 Request_ptr completedRequest = activeRequest;
713 _contentDecoder.finish();
715 assert(sentRequests.front() == activeRequest);
716 sentRequests.pop_front();
717 bool doClose = activeRequest->closeAfterComplete();
718 activeRequest = NULL;
720 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
722 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
723 // this will bring us into handleClose() above, which updates
724 // state to STATE_CLOSED
727 // if we have additional requests waiting, try to start them now
728 tryStartNextRequest();
732 if (state != STATE_CLOSED) {
733 setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
736 // notify request after we change state, so this connection is idle
737 // if completion triggers other requests (which is likely)
738 completedRequest->responseComplete();
739 client->requestFinished(this);
741 setTerminator("\r\n");
745 Request_ptr activeRequest;
746 ConnectionState state;
750 int bodyTransferSize;
751 SGTimeStamp idleTime;
752 bool chunkedTransfer;
755 RequestList queuedRequests;
756 RequestList sentRequests;
758 ContentDecoder _contentDecoder;
759 std::string _connectionId;
760 unsigned int _maxPipelineLength;
762 #endif // of !ENABLE_CURL
768 d->maxConnections = 4;
769 d->maxHostConnections = 4;
770 d->bytesTransferred = 0;
771 d->lastTransferRate = 0;
772 d->timeTransferSample.stamp();
773 d->totalBytesDownloaded = 0;
774 d->maxPipelineDepth = 5;
775 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
776 #if defined(ENABLE_CURL)
777 static bool didInitCurlGlobal = false;
778 if (!didInitCurlGlobal) {
779 curl_global_init(CURL_GLOBAL_ALL);
780 didInitCurlGlobal = true;
783 d->createCurlMulti();
789 #if defined(ENABLE_CURL)
790 curl_multi_cleanup(d->curlMulti);
794 void Client::setMaxConnections(unsigned int maxCon)
796 d->maxConnections = maxCon;
797 #if defined(ENABLE_CURL)
798 curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
802 void Client::setMaxHostConnections(unsigned int maxHostCon)
804 d->maxHostConnections = maxHostCon;
805 #if defined(ENABLE_CURL)
806 curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
810 void Client::setMaxPipelineDepth(unsigned int depth)
812 d->maxPipelineDepth = depth;
813 #if defined(ENABLE_CURL)
814 curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
816 ConnectionDict::iterator it = d->connections.begin();
817 for (; it != d->connections.end(); ) {
818 it->second->setMaxPipelineLength(depth);
823 void Client::update(int waitTimeout)
825 #if defined(ENABLE_CURL)
826 int remainingActive, messagesInQueue;
827 curl_multi_perform(d->curlMulti, &remainingActive);
830 while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
831 if (msg->msg == CURLMSG_DONE) {
833 CURL *e = msg->easy_handle;
834 curl_easy_getinfo(e, CURLINFO_PRIVATE, &rawReq);
836 // ensure request stays valid for the moment
837 // eg if responseComplete cancels us
838 Request_ptr req(rawReq);
841 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
843 // remove from the requests map now,
844 // in case the callbacks perform a cancel. We'll use
845 // the absence from the request dict in cancel to avoid
847 ClientPrivate::RequestCurlMap::iterator it = d->requests.find(req);
848 assert(it != d->requests.end());
849 assert(it->second == e);
850 d->requests.erase(it);
852 if (msg->data.result == 0) {
853 req->responseComplete();
855 SG_LOG(SG_IO, SG_WARN, "CURL Result:" << msg->data.result << " " << curl_easy_strerror(msg->data.result));
856 req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
859 curl_multi_remove_handle(d->curlMulti, e);
860 curl_easy_cleanup(e);
862 // should never happen since CURLMSG_DONE is the only code
864 SG_LOG(SG_IO, SG_ALERT, "unknown CurlMSG:" << msg->msg);
866 } // of curl message processing loop
867 SGTimeStamp::sleepForMSec(waitTimeout);
869 if (!d->poller.hasChannels() && (waitTimeout > 0)) {
870 SGTimeStamp::sleepForMSec(waitTimeout);
872 d->poller.poll(waitTimeout);
875 bool waitingRequests = !d->pendingRequests.empty();
876 ConnectionDict::iterator it = d->connections.begin();
877 for (; it != d->connections.end(); ) {
878 Connection* con = it->second;
879 if (con->hasIdleTimeout() ||
881 con->hasErrorTimeout() ||
882 (!con->isActive() && waitingRequests))
884 if (con->hasErrorTimeout()) {
885 // tell the connection we're timing it out
886 con->handleTimeout();
889 // connection has been idle for a while, clean it up
890 // (or if we have requests waiting for a different host,
891 // or an error condition
892 ConnectionDict::iterator del = it++;
894 d->connections.erase(del);
896 if (it->second->shouldStartNext()) {
897 it->second->tryStartNextRequest();
901 } // of connection iteration
903 if (waitingRequests && (d->connections.size() < d->maxConnections)) {
904 RequestList waiting(d->pendingRequests);
905 d->pendingRequests.clear();
907 // re-submit all waiting requests in order; this takes care of
908 // finding multiple pending items targetted to the same (new)
910 BOOST_FOREACH(Request_ptr req, waiting) {
917 void Client::makeRequest(const Request_ptr& r)
919 if( r->isComplete() )
922 if( r->url().find("://") == std::string::npos ) {
923 r->setFailure(EINVAL, "malformed URL");
929 #if defined(ENABLE_CURL)
930 ClientPrivate::RequestCurlMap::iterator rit = d->requests.find(r);
931 assert(rit == d->requests.end());
933 CURL* curlRequest = curl_easy_init();
934 curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
936 d->requests[r] = curlRequest;
938 curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
939 // disable built-in libCurl progress feedback
940 curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
942 curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
943 curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
944 curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
945 curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
947 curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
948 curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
950 if (!d->proxy.empty()) {
951 curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
952 curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
954 if (!d->proxyAuth.empty()) {
955 curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
956 curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
960 std::string method = boost::to_lower_copy(r->method());
961 if (method == "get") {
962 curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
963 } else if (method == "put") {
964 curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
965 curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
966 } else if (method == "post") {
967 // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
968 curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
970 std::string q = r->query().substr(1);
971 curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
973 // reset URL to exclude query pieces
974 std::string urlWithoutQuery = r->url();
975 std::string::size_type queryPos = urlWithoutQuery.find('?');
976 urlWithoutQuery.resize(queryPos);
977 curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
979 curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
982 struct curl_slist* headerList = NULL;
983 if (r->hasBodyData() && (method != "post")) {
984 curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
985 curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
986 curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
987 curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
988 std::string h = "Content-Type:" + r->bodyType();
989 headerList = curl_slist_append(headerList, h.c_str());
992 StringMap::const_iterator it;
993 for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
994 std::string h = it->first + ": " + it->second;
995 headerList = curl_slist_append(headerList, h.c_str());
998 if (headerList != NULL) {
999 curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
1002 curl_multi_add_handle(d->curlMulti, curlRequest);
1004 // this seems premature, but we don't have a callback from Curl we could
1005 // use to trigger when the requst is actually sent.
1009 if( r->url().find("http://") != 0 ) {
1010 r->setFailure(EINVAL, "only HTTP protocol is supported");
1014 std::string host = r->host();
1015 int port = r->port();
1016 if (!d->proxy.empty()) {
1018 port = d->proxyPort;
1021 Connection* con = NULL;
1022 std::stringstream ss;
1023 ss << host << "-" << port;
1024 std::string connectionId = ss.str();
1025 bool havePending = !d->pendingRequests.empty();
1026 bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
1027 ConnectionDict::iterator consEnd = d->connections.end();
1029 // assign request to an existing Connection.
1030 // various options exist here, examined in order
1031 ConnectionDict::iterator it = d->connections.find(connectionId);
1032 if (atConnectionsLimit && (it == consEnd)) {
1033 // maximum number of connections active, queue this request
1034 // when a connection goes inactive, we'll start this one
1035 d->pendingRequests.push_back(r);
1039 // scan for an idle Connection to the same host (likely if we're
1040 // retrieving multiple resources from the same host in quick succession)
1041 // if we have pending requests (waiting for a free Connection), then
1042 // force new requests on this id to always use the first Connection
1043 // (instead of the random selection below). This ensures that when
1044 // there's pressure on the number of connections to keep alive, one
1045 // host can't DoS every other.
1047 for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
1048 if (havePending || !it->second->isActive()) {
1054 bool atHostConnectionsLimit = (count >= d->maxHostConnections);
1056 if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
1057 // all current connections are busy (active), and we don't
1058 // have free connections to allocate, so let's assign to
1059 // an existing one randomly. Ideally we'd used whichever one will
1060 // complete first but we don't have that info.
1061 int index = rand() % count;
1062 for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
1066 // allocate a new connection object
1068 static int connectionSuffx = 0;
1070 std::stringstream ss;
1071 ss << connectionId << "-" << connectionSuffx++;
1073 SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
1074 con = new Connection(this, ss.str());
1075 con->setServer(host, port);
1076 con->setMaxPipelineLength(d->maxPipelineDepth);
1077 d->poller.addChannel(con);
1078 d->connections.insert(d->connections.end(),
1079 ConnectionDict::value_type(connectionId, con));
1082 SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
1083 con->queueRequest(r);
1087 void Client::cancelRequest(const Request_ptr &r, std::string reason)
1089 #if defined(ENABLE_CURL)
1090 ClientPrivate::RequestCurlMap::iterator it = d->requests.find(r);
1091 if(it == d->requests.end()) {
1092 // already being removed, presumably inside ::update()
1093 // nothing more to do
1097 CURLMcode err = curl_multi_remove_handle(d->curlMulti, it->second);
1098 assert(err == CURLM_OK);
1100 // clear the request pointer form the curl-easy object
1101 curl_easy_setopt(it->second, CURLOPT_PRIVATE, 0);
1103 curl_easy_cleanup(it->second);
1104 d->requests.erase(it);
1106 ConnectionDict::iterator it = d->connections.begin();
1107 for (; it != d->connections.end(); ++it) {
1108 (it->second)->cancelRequest(r);
1111 r->setFailure(-1, reason);
1114 //------------------------------------------------------------------------------
1115 FileRequestRef Client::save( const std::string& url,
1116 const std::string& filename )
1118 FileRequestRef req = new FileRequest(url, filename);
1123 //------------------------------------------------------------------------------
1124 MemoryRequestRef Client::load(const std::string& url)
1126 MemoryRequestRef req = new MemoryRequest(url);
1131 void Client::requestFinished(Connection* con)
1136 void Client::setUserAgent(const std::string& ua)
1141 const std::string& Client::userAgent() const
1143 return d->userAgent;
1146 const std::string& Client::proxyHost() const
1151 const std::string& Client::proxyAuth() const
1153 return d->proxyAuth;
1156 void Client::setProxy( const std::string& proxy,
1158 const std::string& auth )
1161 d->proxyPort = port;
1162 d->proxyAuth = auth;
1165 bool Client::hasActiveRequests() const
1167 #if defined(ENABLE_CURL)
1168 return !d->requests.empty();
1170 ConnectionDict::const_iterator it = d->connections.begin();
1171 for (; it != d->connections.end(); ++it) {
1172 if (it->second->isActive()) return true;
1179 void Client::receivedBytes(unsigned int count)
1181 d->bytesTransferred += count;
1182 d->totalBytesDownloaded += count;
1185 unsigned int Client::transferRateBytesPerSec() const
1187 unsigned int e = d->timeTransferSample.elapsedMSec();
1189 // too long a window, ignore
1190 d->timeTransferSample.stamp();
1191 d->bytesTransferred = 0;
1192 d->lastTransferRate = 0;
1196 if (e < 100) { // avoid really narrow windows
1197 return d->lastTransferRate;
1200 unsigned int ratio = (d->bytesTransferred * 1000) / e;
1201 // run a low-pass filter
1202 unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1205 d->timeTransferSample.stamp();
1206 d->bytesTransferred = 0;
1207 d->lastTransferRate = smoothed;
1211 uint64_t Client::totalBytesDownloaded() const
1213 return d->totalBytesDownloaded;
1216 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1218 size_t byteSize = size * nmemb;
1219 Request* req = static_cast<Request*>(userdata);
1220 req->processBodyBytes(ptr, byteSize);
1222 Client* cl = req->http();
1224 cl->receivedBytes(byteSize);
1230 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1232 size_t maxBytes = size * nmemb;
1233 Request* req = static_cast<Request*>(userdata);
1234 size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1238 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1240 size_t byteSize = size * nitems;
1241 Request* req = static_cast<Request*>(userdata);
1242 std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1244 if (req->readyState() == HTTP::Request::OPENED) {
1245 req->responseStart(h);
1250 // got a 100-continue reponse; restart
1251 if (req->responseCode() == 100) {
1252 req->setReadyState(HTTP::Request::OPENED);
1256 req->responseHeadersComplete();
1260 if (req->responseCode() == 100) {
1261 return byteSize; // skip headers associated with 100-continue status
1264 size_t colonPos = h.find(':');
1265 if (colonPos == std::string::npos) {
1266 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1270 std::string key = strutils::simplify(h.substr(0, colonPos));
1271 std::string lkey = boost::to_lower_copy(key);
1272 std::string value = strutils::strip(h.substr(colonPos + 1));
1274 req->responseHeader(lkey, value);
1278 void Client::debugDumpRequests()
1280 #if defined(ENABLE_CURL)
1281 SG_LOG(SG_IO, SG_INFO, "== HTTP request dump");
1282 ClientPrivate::RequestCurlMap::iterator it = d->requests.begin();
1283 for (; it != d->requests.end(); ++it) {
1284 SG_LOG(SG_IO, SG_INFO, "\t" << it->first->url());
1286 SG_LOG(SG_IO, SG_INFO, "==");
1288 SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
1289 ConnectionDict::iterator it = d->connections.begin();
1290 for (; it != d->connections.end(); ++it) {
1291 it->second->debugDumpRequests();
1293 SG_LOG(SG_IO, SG_INFO, "==");
1297 void Client::clearAllConnections()
1299 #if defined(ENABLE_CURL)
1300 curl_multi_cleanup(d->curlMulti);
1301 d->createCurlMulti();
1303 ConnectionDict::iterator it = d->connections.begin();
1304 for (; it != d->connections.end(); ++it) {
1307 d->connections.clear();
1311 } // of namespace HTTP
1313 } // of namespace simgear