2 * \file HTTPClient.cxx - simple HTTP client engine for SimHear
5 // Written by James Turner
7 // Copyright (C) 2013 James Turner <zakalawe@mac.com>
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Library General Public License for more details.
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
30 #include <cstdlib> // rand()
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
39 #include <simgear/simgear_config.h>
41 #if defined(ENABLE_CURL)
42 #include <curl/multi.h>
44 #include <simgear/io/HTTPContentDecode.hxx>
47 #include <simgear/io/sg_netChat.hxx>
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
58 # if !defined(SIMGEAR_VERSION)
59 # define SIMGEAR_VERSION "simgear-development"
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
71 const unsigned int MAX_INFLIGHT_REQUESTS = 4;
74 typedef std::multimap<std::string, Connection*> ConnectionDict;
75 typedef std::list<Request_ptr> RequestList;
77 class Client::ClientPrivate
80 #if defined(ENABLE_CURL)
82 bool haveActiveRequests;
84 void createCurlMulti()
86 curlMulti = curl_multi_init();
87 // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
88 // we request HTTP 1.1 pipelining
89 curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
90 curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
91 curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
92 (long) MAX_INFLIGHT_REQUESTS);
96 NetChannelPoller poller;
97 // connections by host (potentially more than one)
98 ConnectionDict connections;
101 std::string userAgent;
104 std::string proxyAuth;
105 unsigned int maxConnections;
107 RequestList pendingRequests;
111 SGTimeStamp timeTransferSample;
112 unsigned int bytesTransferred;
113 unsigned int lastTransferRate;
114 uint64_t totalBytesDownloaded;
117 #if !defined(ENABLE_CURL)
118 class Connection : public NetChat
121 Connection(Client* pr, const std::string& conId) :
124 port(DEFAULT_HTTP_PORT),
129 virtual ~Connection()
133 virtual void handleBufferRead (NetBuffer& buffer)
135 if( !activeRequest || !activeRequest->isComplete() )
136 return NetChat::handleBufferRead(buffer);
138 // Request should be aborted (signaled by setting its state to complete).
140 // force the state to GETTING_BODY, to simplify logic in
141 // responseComplete and handleClose
142 state = STATE_GETTING_BODY;
146 void setServer(const std::string& h, short p)
152 // socket-level errors
153 virtual void handleError(int error)
155 const char* errStr = strerror(error);
156 SG_LOG(SG_IO, SG_WARN, "HTTP Connection handleError:" << error << " ("
163 // connection level failure, eg name lookup or routing
164 // we won't have an active request yet, so let's fail all of the
165 // requests since we presume it's a systematic failure for
166 // the host in question
167 BOOST_FOREACH(Request_ptr req, sentRequests) {
168 req->setFailure(error, errStr);
171 BOOST_FOREACH(Request_ptr req, queuedRequests) {
172 req->setFailure(error, errStr);
175 sentRequests.clear();
176 queuedRequests.clear();
179 NetChat::handleError(error);
181 activeRequest->setFailure(error, errStr);
182 activeRequest = NULL;
183 _contentDecoder.reset();
186 state = STATE_SOCKET_ERROR;
191 handleError(ETIMEDOUT);
194 virtual void handleClose()
196 NetChat::handleClose();
198 // closing of the connection from the server side when getting the body,
199 bool canCloseState = (state == STATE_GETTING_BODY);
200 if (canCloseState && activeRequest) {
201 // force state here, so responseComplete can avoid closing the
203 state = STATE_CLOSED;
206 if (state == STATE_WAITING_FOR_RESPONSE) {
207 assert(!sentRequests.empty());
208 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
209 // no active request, but don't restore the front sent one
210 sentRequests.erase(sentRequests.begin());
214 activeRequest->setFailure(500, "server closed connection");
215 // remove the failed request from sentRequests, so it does
217 RequestList::iterator it = std::find(sentRequests.begin(),
218 sentRequests.end(), activeRequest);
219 if (it != sentRequests.end()) {
220 sentRequests.erase(it);
222 activeRequest = NULL;
223 _contentDecoder.reset();
226 state = STATE_CLOSED;
229 if (sentRequests.empty()) {
233 // restore sent requests to the queue, so they will be re-sent
234 // when the connection opens again
235 queuedRequests.insert(queuedRequests.begin(),
236 sentRequests.begin(), sentRequests.end());
237 sentRequests.clear();
240 void queueRequest(const Request_ptr& r)
242 queuedRequests.push_back(r);
243 tryStartNextRequest();
248 assert(!sentRequests.empty());
249 assert(state == STATE_WAITING_FOR_RESPONSE);
251 activeRequest = sentRequests.front();
253 activeRequest->responseStart(buffer);
254 } catch (sg_exception& e) {
259 state = STATE_GETTING_HEADERS;
261 if (activeRequest->responseCode() == 204) {
262 noMessageBody = true;
263 } else if (activeRequest->method() == "HEAD") {
264 noMessageBody = true;
266 noMessageBody = false;
269 bodyTransferSize = -1;
270 chunkedTransfer = false;
271 _contentDecoder.reset();
274 void tryStartNextRequest()
276 while( !queuedRequests.empty()
277 && queuedRequests.front()->isComplete() )
278 queuedRequests.pop_front();
280 if (queuedRequests.empty()) {
285 if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
289 if (state == STATE_CLOSED) {
290 if (!connectToHost()) {
295 setTerminator("\r\n");
299 Request_ptr r = queuedRequests.front();
302 std::stringstream headerData;
303 std::string path = r->path();
304 assert(!path.empty());
305 std::string query = r->query();
306 std::string bodyData;
308 if (!client->proxyHost().empty()) {
309 path = r->scheme() + "://" + r->host() + r->path();
312 if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
313 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
314 bodyData = query.substr(1); // URL-encode, drop the leading '?'
315 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
316 headerData << "Content-Length:" << bodyData.size() << "\r\n";
318 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
319 if( r->hasBodyData() )
321 headerData << "Content-Length:" << r->bodyLength() << "\r\n";
322 headerData << "Content-Type:" << r->bodyType() << "\r\n";
326 headerData << "Host: " << r->hostAndPort() << "\r\n";
327 headerData << "User-Agent:" << client->userAgent() << "\r\n";
328 headerData << "Accept-Encoding: deflate, gzip\r\n";
329 if (!client->proxyAuth().empty()) {
330 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
333 BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
334 headerData << h.first << ": " << h.second << "\r\n";
337 headerData << "\r\n"; // final CRLF to terminate the headers
338 if (!bodyData.empty()) {
339 headerData << bodyData;
342 bool ok = push(headerData.str().c_str());
344 SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
345 // we've over-stuffed the socket, give up for now, let things
346 // drain down before trying to start any more requests.
350 if( r->hasBodyData() )
351 for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
354 size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
357 if( !bufferSend(buf, len) )
361 "overflow the HTTP::Connection output buffer");
362 state = STATE_SOCKET_ERROR;
365 body_bytes_sent += len;
371 "HTTP asynchronous request body generation is unsupported");
376 SG_LOG(SG_IO, SG_DEBUG, "con:" << connectionId << " did start request:" << r->url());
377 // successfully sent, remove from queue, and maybe send the next
378 queuedRequests.pop_front();
379 sentRequests.push_back(r);
380 if (state == STATE_IDLE) {
381 state = STATE_WAITING_FOR_RESPONSE;
384 // pipelining, let's maybe send the next request right away
385 tryStartNextRequest();
388 virtual void collectIncomingData(const char* s, int n)
391 client->receivedBytes(static_cast<unsigned int>(n));
393 if( (state == STATE_GETTING_BODY)
394 || (state == STATE_GETTING_CHUNKED_BYTES) )
395 _contentDecoder.receivedBytes(s, n);
400 virtual void foundTerminator(void)
404 case STATE_WAITING_FOR_RESPONSE:
408 case STATE_GETTING_HEADERS:
413 case STATE_GETTING_BODY:
417 case STATE_GETTING_CHUNKED:
418 processChunkHeader();
421 case STATE_GETTING_CHUNKED_BYTES:
422 setTerminator("\r\n");
423 state = STATE_GETTING_CHUNKED;
428 case STATE_GETTING_TRAILER:
434 SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
441 bool hasIdleTimeout() const
443 if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
447 assert(sentRequests.empty());
448 bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
452 bool hasErrorTimeout() const
454 if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
458 bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
462 bool hasError() const
464 return (state == STATE_SOCKET_ERROR);
467 bool shouldStartNext() const
469 return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
472 bool isActive() const
474 return !queuedRequests.empty() || !sentRequests.empty();
477 void debugDumpRequests() const
479 SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << connectionId
480 << "; state=" << state << ")");
482 SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
484 SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
487 BOOST_FOREACH(Request_ptr req, sentRequests) {
488 SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
491 BOOST_FOREACH(Request_ptr req, queuedRequests) {
492 SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
498 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
501 SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
505 if (connect(host.c_str(), port) != 0) {
506 SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
516 std::string h = strutils::simplify(buffer);
517 if (h.empty()) { // blank line terminates headers
522 int colonPos = buffer.find(':');
524 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
528 std::string key = strutils::simplify(buffer.substr(0, colonPos));
529 std::string lkey = boost::to_lower_copy(key);
530 std::string value = strutils::strip(buffer.substr(colonPos + 1));
532 // only consider these if getting headers (as opposed to trailers
533 // of a chunked transfer)
534 if (state == STATE_GETTING_HEADERS) {
535 if (lkey == "content-length") {
537 int sz = strutils::to_int(value);
538 if (bodyTransferSize <= 0) {
539 bodyTransferSize = sz;
541 activeRequest->setResponseLength(sz);
542 } else if (lkey == "transfer-length") {
543 bodyTransferSize = strutils::to_int(value);
544 } else if (lkey == "transfer-encoding") {
545 processTransferEncoding(value);
546 } else if (lkey == "content-encoding") {
547 _contentDecoder.setEncoding(value);
551 activeRequest->responseHeader(lkey, value);
554 void processTransferEncoding(const std::string& te)
556 if (te == "chunked") {
557 chunkedTransfer = true;
559 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
564 void processChunkHeader()
566 if (buffer.empty()) {
567 // blank line after chunk data
572 int semiPos = buffer.find(';');
574 // extensions ignored for the moment
575 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
577 chunkSize = strutils::to_int(buffer, 16);
581 if (chunkSize == 0) { // trailer start
582 state = STATE_GETTING_TRAILER;
586 state = STATE_GETTING_CHUNKED_BYTES;
587 setByteCount(chunkSize);
590 void processTrailer()
592 if (buffer.empty()) {
598 // process as a normal header
602 void headersComplete()
604 activeRequest->responseHeadersComplete();
605 _contentDecoder.initWithRequest(activeRequest);
607 if (chunkedTransfer) {
608 state = STATE_GETTING_CHUNKED;
609 } else if (noMessageBody || (bodyTransferSize == 0)) {
610 // force the state to GETTING_BODY, to simplify logic in
611 // responseComplete and handleClose
612 state = STATE_GETTING_BODY;
615 setByteCount(bodyTransferSize); // may be -1, that's fine
616 state = STATE_GETTING_BODY;
620 void responseComplete()
622 Request_ptr completedRequest = activeRequest;
623 _contentDecoder.finish();
625 assert(sentRequests.front() == activeRequest);
626 sentRequests.pop_front();
627 bool doClose = activeRequest->closeAfterComplete();
628 activeRequest = NULL;
630 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
632 // this will bring us into handleClose() above, which updates
633 // state to STATE_CLOSED
636 // if we have additional requests waiting, try to start them now
637 tryStartNextRequest();
641 if (state != STATE_CLOSED) {
642 state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
645 // notify request after we change state, so this connection is idle
646 // if completion triggers other requests (which is likely)
647 completedRequest->responseComplete();
648 client->requestFinished(this);
650 setTerminator("\r\n");
653 enum ConnectionState {
655 STATE_WAITING_FOR_RESPONSE,
656 STATE_GETTING_HEADERS,
658 STATE_GETTING_CHUNKED,
659 STATE_GETTING_CHUNKED_BYTES,
660 STATE_GETTING_TRAILER,
662 STATE_CLOSED ///< connection should be closed now
666 Request_ptr activeRequest;
667 ConnectionState state;
671 int bodyTransferSize;
672 SGTimeStamp idleTime;
673 bool chunkedTransfer;
676 RequestList queuedRequests;
677 RequestList sentRequests;
679 ContentDecoder _contentDecoder;
680 std::string connectionId;
682 #endif // of !ENABLE_CURL
688 d->maxConnections = 4;
689 d->bytesTransferred = 0;
690 d->lastTransferRate = 0;
691 d->timeTransferSample.stamp();
692 d->totalBytesDownloaded = 0;
694 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
695 #if defined(ENABLE_CURL)
696 static bool didInitCurlGlobal = false;
697 if (!didInitCurlGlobal) {
698 curl_global_init(CURL_GLOBAL_ALL);
699 didInitCurlGlobal = true;
702 d->createCurlMulti();
708 #if defined(ENABLE_CURL)
709 curl_multi_cleanup(d->curlMulti);
713 void Client::setMaxConnections(unsigned int maxCon)
716 throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
719 d->maxConnections = maxCon;
720 #if defined(ENABLE_CURL)
721 curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
725 void Client::update(int waitTimeout)
727 #if defined(ENABLE_CURL)
728 int remainingActive, messagesInQueue;
729 curl_multi_perform(d->curlMulti, &remainingActive);
730 d->haveActiveRequests = (remainingActive > 0);
733 while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
734 if (msg->msg == CURLMSG_DONE) {
736 CURL *e = msg->easy_handle;
737 curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
740 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
742 if (msg->data.result == 0) {
743 req->responseComplete();
745 fprintf(stderr, "Result: %d - %s\n",
746 msg->data.result, curl_easy_strerror(msg->data.result));
747 req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
750 curl_multi_remove_handle(d->curlMulti, e);
752 // balance the reference we take in makeRequest
753 SGReferenced::put(req);
754 curl_easy_cleanup(e);
757 SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
759 } // of curl message processing loop
761 if (!d->poller.hasChannels() && (waitTimeout > 0)) {
762 SGTimeStamp::sleepForMSec(waitTimeout);
764 d->poller.poll(waitTimeout);
767 bool waitingRequests = !d->pendingRequests.empty();
768 ConnectionDict::iterator it = d->connections.begin();
769 for (; it != d->connections.end(); ) {
770 Connection* con = it->second;
771 if (con->hasIdleTimeout() ||
773 con->hasErrorTimeout() ||
774 (!con->isActive() && waitingRequests))
776 if (con->hasErrorTimeout()) {
777 // tell the connection we're timing it out
778 con->handleTimeout();
781 // connection has been idle for a while, clean it up
782 // (or if we have requests waiting for a different host,
783 // or an error condition
784 ConnectionDict::iterator del = it++;
786 d->connections.erase(del);
788 if (it->second->shouldStartNext()) {
789 it->second->tryStartNextRequest();
793 } // of connection iteration
795 if (waitingRequests && (d->connections.size() < d->maxConnections)) {
796 RequestList waiting(d->pendingRequests);
797 d->pendingRequests.clear();
799 // re-submit all waiting requests in order; this takes care of
800 // finding multiple pending items targetted to the same (new)
802 BOOST_FOREACH(Request_ptr req, waiting) {
809 void Client::makeRequest(const Request_ptr& r)
811 if( r->isComplete() )
814 if( r->url().find("://") == std::string::npos ) {
815 r->setFailure(EINVAL, "malformed URL");
819 #if defined(ENABLE_CURL)
820 CURL* curlRequest = curl_easy_init();
821 curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
823 // manually increase the ref count of the request
824 SGReferenced::get(r.get());
825 curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
826 // disable built-in libCurl progress feedback
827 curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
829 curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
830 curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
831 curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
832 curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
834 curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
835 curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
837 if (!d->proxy.empty()) {
838 curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
839 curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
841 if (!d->proxyAuth.empty()) {
842 curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
843 curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
847 std::string method = boost::to_lower_copy(r->method());
848 if (method == "get") {
849 curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
850 } else if (method == "put") {
851 curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
852 curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
853 } else if (method == "post") {
854 // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
855 curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
857 std::string q = r->query().substr(1);
858 curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
860 // reset URL to exclude query pieces
861 std::string urlWithoutQuery = r->url();
862 std::string::size_type queryPos = urlWithoutQuery.find('?');
863 urlWithoutQuery.resize(queryPos);
864 curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
866 curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
869 struct curl_slist* headerList = NULL;
870 if (r->hasBodyData() && (method != "post")) {
871 curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
872 curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
873 curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
874 curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
875 std::string h = "Content-Type:" + r->bodyType();
876 headerList = curl_slist_append(headerList, h.c_str());
879 StringMap::const_iterator it;
880 for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
881 std::string h = it->first + ": " + it->second;
882 headerList = curl_slist_append(headerList, h.c_str());
885 if (headerList != NULL) {
886 curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
889 curl_multi_add_handle(d->curlMulti, curlRequest);
890 d->haveActiveRequests = true;
892 // FIXME - premature?
896 if( r->url().find("http://") != 0 ) {
897 r->setFailure(EINVAL, "only HTTP protocol is supported");
901 std::string host = r->host();
902 int port = r->port();
903 if (!d->proxy.empty()) {
908 Connection* con = NULL;
909 std::stringstream ss;
910 ss << host << "-" << port;
911 std::string connectionId = ss.str();
912 bool havePending = !d->pendingRequests.empty();
913 bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
914 ConnectionDict::iterator consEnd = d->connections.end();
916 // assign request to an existing Connection.
917 // various options exist here, examined in order
918 ConnectionDict::iterator it = d->connections.find(connectionId);
919 if (atConnectionsLimit && (it == consEnd)) {
920 // maximum number of connections active, queue this request
921 // when a connection goes inactive, we'll start this one
922 d->pendingRequests.push_back(r);
926 // scan for an idle Connection to the same host (likely if we're
927 // retrieving multiple resources from the same host in quick succession)
928 // if we have pending requests (waiting for a free Connection), then
929 // force new requests on this id to always use the first Connection
930 // (instead of the random selection below). This ensures that when
931 // there's pressure on the number of connections to keep alive, one
932 // host can't DoS every other.
934 for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
935 if (havePending || !it->second->isActive()) {
941 if (!con && atConnectionsLimit) {
942 // all current connections are busy (active), and we don't
943 // have free connections to allocate, so let's assign to
944 // an existing one randomly. Ideally we'd used whichever one will
945 // complete first but we don't have that info.
946 int index = rand() % count;
947 for (it = d->connections.find(connectionId); index > 0; --index) { ; }
951 // allocate a new connection object
953 con = new Connection(this, connectionId);
954 con->setServer(host, port);
955 d->poller.addChannel(con);
956 d->connections.insert(d->connections.end(),
957 ConnectionDict::value_type(connectionId, con));
960 con->queueRequest(r);
964 //------------------------------------------------------------------------------
965 FileRequestRef Client::save( const std::string& url,
966 const std::string& filename )
968 FileRequestRef req = new FileRequest(url, filename);
973 //------------------------------------------------------------------------------
974 MemoryRequestRef Client::load(const std::string& url)
976 MemoryRequestRef req = new MemoryRequest(url);
981 void Client::requestFinished(Connection* con)
986 void Client::setUserAgent(const std::string& ua)
991 const std::string& Client::userAgent() const
996 const std::string& Client::proxyHost() const
1001 const std::string& Client::proxyAuth() const
1003 return d->proxyAuth;
1006 void Client::setProxy( const std::string& proxy,
1008 const std::string& auth )
1011 d->proxyPort = port;
1012 d->proxyAuth = auth;
1015 bool Client::hasActiveRequests() const
1017 #if defined(ENABLE_CURL)
1018 return d->haveActiveRequests;
1020 ConnectionDict::const_iterator it = d->connections.begin();
1021 for (; it != d->connections.end(); ++it) {
1022 if (it->second->isActive()) return true;
1029 void Client::receivedBytes(unsigned int count)
1031 d->bytesTransferred += count;
1032 d->totalBytesDownloaded += count;
1035 unsigned int Client::transferRateBytesPerSec() const
1037 unsigned int e = d->timeTransferSample.elapsedMSec();
1039 // too long a window, ignore
1040 d->timeTransferSample.stamp();
1041 d->bytesTransferred = 0;
1042 d->lastTransferRate = 0;
1046 if (e < 100) { // avoid really narrow windows
1047 return d->lastTransferRate;
1050 unsigned int ratio = (d->bytesTransferred * 1000) / e;
1051 // run a low-pass filter
1052 unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1055 d->timeTransferSample.stamp();
1056 d->bytesTransferred = 0;
1057 d->lastTransferRate = smoothed;
1061 uint64_t Client::totalBytesDownloaded() const
1063 return d->totalBytesDownloaded;
1066 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1068 size_t byteSize = size * nmemb;
1070 Request* req = static_cast<Request*>(userdata);
1071 req->processBodyBytes(ptr, byteSize);
1075 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1077 size_t maxBytes = size * nmemb;
1078 Request* req = static_cast<Request*>(userdata);
1079 size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1083 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1085 size_t byteSize = size * nitems;
1086 Request* req = static_cast<Request*>(userdata);
1087 std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1089 if (req->readyState() == HTTP::Request::OPENED) {
1090 req->responseStart(h);
1095 // got a 100-continue reponse; restart
1096 if (req->responseCode() == 100) {
1097 req->setReadyState(HTTP::Request::OPENED);
1101 req->responseHeadersComplete();
1105 if (req->responseCode() == 100) {
1106 return byteSize; // skip headers associated with 100-continue status
1109 size_t colonPos = h.find(':');
1110 if (colonPos == std::string::npos) {
1111 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1115 std::string key = strutils::simplify(h.substr(0, colonPos));
1116 std::string lkey = boost::to_lower_copy(key);
1117 std::string value = strutils::strip(h.substr(colonPos + 1));
1119 req->responseHeader(lkey, value);
1123 void Client::debugDumpRequests()
1125 #if defined(ENABLE_CURL)
1128 SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
1129 ConnectionDict::iterator it = d->connections.begin();
1130 for (; it != d->connections.end(); ++it) {
1131 it->second->debugDumpRequests();
1133 SG_LOG(SG_IO, SG_INFO, "==");
1137 void Client::clearAllConnections()
1139 #if defined(ENABLE_CURL)
1140 curl_multi_cleanup(d->curlMulti);
1141 d->createCurlMulti();
1143 ConnectionDict::iterator it = d->connections.begin();
1144 for (; it != d->connections.end(); ++it) {
1147 d->connections.clear();
1151 } // of namespace HTTP
1153 } // of namespace simgear