2 * \file HTTPClient.cxx - simple HTTP client engine for SimHear
5 // Written by James Turner
7 // Copyright (C) 2013 James Turner <zakalawe@mac.com>
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Library General Public License for more details.
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
30 #include <cstdlib> // rand()
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
39 #include <simgear/simgear_config.h>
41 #if defined(ENABLE_CURL)
42 #include <curl/multi.h>
44 #include <simgear/io/HTTPContentDecode.hxx>
47 #include <simgear/io/sg_netChat.hxx>
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
58 # if !defined(SIMGEAR_VERSION)
59 # define SIMGEAR_VERSION "simgear-development"
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
73 typedef std::multimap<std::string, Connection*> ConnectionDict;
74 typedef std::list<Request_ptr> RequestList;
76 class Client::ClientPrivate
79 #if defined(ENABLE_CURL)
81 bool haveActiveRequests;
83 void createCurlMulti()
85 curlMulti = curl_multi_init();
86 // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
87 // we request HTTP 1.1 pipelining
88 curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
89 curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
90 curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
91 (long) maxPipelineDepth);
92 curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
93 (long) maxHostConnections);
98 NetChannelPoller poller;
99 // connections by host (potentially more than one)
100 ConnectionDict connections;
103 std::string userAgent;
106 std::string proxyAuth;
107 unsigned int maxConnections;
108 unsigned int maxHostConnections;
109 unsigned int maxPipelineDepth;
111 RequestList pendingRequests;
113 SGTimeStamp timeTransferSample;
114 unsigned int bytesTransferred;
115 unsigned int lastTransferRate;
116 uint64_t totalBytesDownloaded;
119 #if !defined(ENABLE_CURL)
120 class Connection : public NetChat
123 Connection(Client* pr, const std::string& conId) :
126 port(DEFAULT_HTTP_PORT),
127 _connectionId(conId),
128 _maxPipelineLength(255)
132 virtual ~Connection()
136 virtual void handleBufferRead (NetBuffer& buffer)
138 if( !activeRequest || !activeRequest->isComplete() )
139 return NetChat::handleBufferRead(buffer);
141 // Request should be aborted (signaled by setting its state to complete).
143 // force the state to GETTING_BODY, to simplify logic in
144 // responseComplete and handleClose
145 setState(STATE_GETTING_BODY);
149 void setServer(const std::string& h, short p)
155 void setMaxPipelineLength(unsigned int m)
157 _maxPipelineLength = m;
160 // socket-level errors
161 virtual void handleError(int error)
163 const char* errStr = strerror(error);
164 SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
171 // connection level failure, eg name lookup or routing
172 // we won't have an active request yet, so let's fail all of the
173 // requests since we presume it's a systematic failure for
174 // the host in question
175 BOOST_FOREACH(Request_ptr req, sentRequests) {
176 req->setFailure(error, errStr);
179 BOOST_FOREACH(Request_ptr req, queuedRequests) {
180 req->setFailure(error, errStr);
183 sentRequests.clear();
184 queuedRequests.clear();
187 NetChat::handleError(error);
189 activeRequest->setFailure(error, errStr);
190 activeRequest = NULL;
191 _contentDecoder.reset();
194 setState(STATE_SOCKET_ERROR);
199 handleError(ETIMEDOUT);
202 virtual void handleClose()
204 NetChat::handleClose();
206 // closing of the connection from the server side when getting the body,
207 bool canCloseState = (state == STATE_GETTING_BODY);
208 if (canCloseState && activeRequest) {
209 // check bodyTransferSize matches how much we actually transferred
210 if (bodyTransferSize > 0) {
211 if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
212 SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
213 << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
217 // force state here, so responseComplete can avoid closing the
219 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
220 setState(STATE_CLOSED);
223 if (state == STATE_WAITING_FOR_RESPONSE) {
224 SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
225 << sentRequests.front()->url());
226 assert(!sentRequests.empty());
227 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
228 // no active request, but don't restore the front sent one
229 sentRequests.erase(sentRequests.begin());
233 activeRequest->setFailure(500, "server closed connection");
234 // remove the failed request from sentRequests, so it does
236 RequestList::iterator it = std::find(sentRequests.begin(),
237 sentRequests.end(), activeRequest);
238 if (it != sentRequests.end()) {
239 sentRequests.erase(it);
241 activeRequest = NULL;
242 _contentDecoder.reset();
245 setState(STATE_CLOSED);
248 if (sentRequests.empty()) {
252 // restore sent requests to the queue, so they will be re-sent
253 // when the connection opens again
254 queuedRequests.insert(queuedRequests.begin(),
255 sentRequests.begin(), sentRequests.end());
256 sentRequests.clear();
259 void queueRequest(const Request_ptr& r)
261 queuedRequests.push_back(r);
262 tryStartNextRequest();
267 assert(!sentRequests.empty());
268 assert(state == STATE_WAITING_FOR_RESPONSE);
270 activeRequest = sentRequests.front();
272 SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
273 activeRequest->responseStart(buffer);
274 } catch (sg_exception& e) {
279 setState(STATE_GETTING_HEADERS);
281 if (activeRequest->responseCode() == 204) {
282 noMessageBody = true;
283 } else if (activeRequest->method() == "HEAD") {
284 noMessageBody = true;
286 noMessageBody = false;
289 bodyTransferSize = -1;
290 chunkedTransfer = false;
291 _contentDecoder.reset();
294 void tryStartNextRequest()
296 while( !queuedRequests.empty()
297 && queuedRequests.front()->isComplete() )
298 queuedRequests.pop_front();
300 if (queuedRequests.empty()) {
305 if (sentRequests.size() >= _maxPipelineLength) {
309 if (state == STATE_CLOSED) {
310 if (!connectToHost()) {
311 setState(STATE_SOCKET_ERROR);
315 SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
316 setTerminator("\r\n");
317 setState(STATE_IDLE);
320 Request_ptr r = queuedRequests.front();
323 std::stringstream headerData;
324 std::string path = r->path();
325 assert(!path.empty());
326 std::string query = r->query();
327 std::string bodyData;
329 if (!client->proxyHost().empty()) {
330 path = r->scheme() + "://" + r->host() + r->path();
333 if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
334 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
335 bodyData = query.substr(1); // URL-encode, drop the leading '?'
336 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
337 headerData << "Content-Length:" << bodyData.size() << "\r\n";
339 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
340 if( r->hasBodyData() )
342 headerData << "Content-Length:" << r->bodyLength() << "\r\n";
343 headerData << "Content-Type:" << r->bodyType() << "\r\n";
347 headerData << "Host: " << r->hostAndPort() << "\r\n";
348 headerData << "User-Agent:" << client->userAgent() << "\r\n";
349 headerData << "Accept-Encoding: deflate, gzip\r\n";
350 if (!client->proxyAuth().empty()) {
351 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
354 BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
355 headerData << h.first << ": " << h.second << "\r\n";
358 headerData << "\r\n"; // final CRLF to terminate the headers
359 if (!bodyData.empty()) {
360 headerData << bodyData;
363 bool ok = push(headerData.str().c_str());
365 SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
366 // we've over-stuffed the socket, give up for now, let things
367 // drain down before trying to start any more requests.
371 if( r->hasBodyData() )
372 for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
375 size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
378 if( !bufferSend(buf, len) )
382 "overflow the HTTP::Connection output buffer");
383 state = STATE_SOCKET_ERROR;
386 body_bytes_sent += len;
392 "HTTP asynchronous request body generation is unsupported");
397 SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
398 // successfully sent, remove from queue, and maybe send the next
399 queuedRequests.pop_front();
400 sentRequests.push_back(r);
401 if (state == STATE_IDLE) {
402 setState(STATE_WAITING_FOR_RESPONSE);
405 // pipelining, let's maybe send the next request right away
406 tryStartNextRequest();
409 virtual void collectIncomingData(const char* s, int n)
412 client->receivedBytes(static_cast<unsigned int>(n));
414 if( (state == STATE_GETTING_BODY)
415 || (state == STATE_GETTING_CHUNKED_BYTES) )
416 _contentDecoder.receivedBytes(s, n);
421 virtual void foundTerminator(void)
425 case STATE_WAITING_FOR_RESPONSE:
429 case STATE_GETTING_HEADERS:
434 case STATE_GETTING_BODY:
438 case STATE_GETTING_CHUNKED:
439 processChunkHeader();
442 case STATE_GETTING_CHUNKED_BYTES:
443 setTerminator("\r\n");
444 setState(STATE_GETTING_CHUNKED);
449 case STATE_GETTING_TRAILER:
455 SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
462 bool hasIdleTimeout() const
464 if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
468 assert(sentRequests.empty());
469 bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
473 bool hasErrorTimeout() const
475 if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
479 bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
483 bool hasError() const
485 return (state == STATE_SOCKET_ERROR);
488 bool shouldStartNext() const
490 return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
493 bool isActive() const
495 return !queuedRequests.empty() || !sentRequests.empty();
498 std::string connectionId() const
500 return _connectionId;
503 void debugDumpRequests() const
505 SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
506 << "; state=" << state << ")");
508 SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
510 SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
513 BOOST_FOREACH(Request_ptr req, sentRequests) {
514 SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
517 BOOST_FOREACH(Request_ptr req, queuedRequests) {
518 SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
522 enum ConnectionState {
524 STATE_WAITING_FOR_RESPONSE,
525 STATE_GETTING_HEADERS,
527 STATE_GETTING_CHUNKED,
528 STATE_GETTING_CHUNKED_BYTES,
529 STATE_GETTING_TRAILER,
531 STATE_CLOSED ///< connection should be closed now
534 void setState(ConnectionState newState)
536 if (state == newState) {
545 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
548 SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
552 if (connect(host.c_str(), port) != 0) {
553 SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
563 std::string h = strutils::simplify(buffer);
564 if (h.empty()) { // blank line terminates headers
569 int colonPos = buffer.find(':');
571 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
575 std::string key = strutils::simplify(buffer.substr(0, colonPos));
576 std::string lkey = boost::to_lower_copy(key);
577 std::string value = strutils::strip(buffer.substr(colonPos + 1));
579 // only consider these if getting headers (as opposed to trailers
580 // of a chunked transfer)
581 if (state == STATE_GETTING_HEADERS) {
582 if (lkey == "content-length") {
584 int sz = strutils::to_int(value);
585 if (bodyTransferSize <= 0) {
586 bodyTransferSize = sz;
588 activeRequest->setResponseLength(sz);
589 } else if (lkey == "transfer-length") {
590 bodyTransferSize = strutils::to_int(value);
591 } else if (lkey == "transfer-encoding") {
592 processTransferEncoding(value);
593 } else if (lkey == "content-encoding") {
594 _contentDecoder.setEncoding(value);
598 activeRequest->responseHeader(lkey, value);
601 void processTransferEncoding(const std::string& te)
603 if (te == "chunked") {
604 chunkedTransfer = true;
606 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
611 void processChunkHeader()
613 if (buffer.empty()) {
614 // blank line after chunk data
619 int semiPos = buffer.find(';');
621 // extensions ignored for the moment
622 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
624 chunkSize = strutils::to_int(buffer, 16);
628 if (chunkSize == 0) { // trailer start
629 setState(STATE_GETTING_TRAILER);
633 setState(STATE_GETTING_CHUNKED_BYTES);
634 setByteCount(chunkSize);
637 void processTrailer()
639 if (buffer.empty()) {
645 // process as a normal header
649 void headersComplete()
651 activeRequest->responseHeadersComplete();
652 _contentDecoder.initWithRequest(activeRequest);
654 if (!activeRequest->serverSupportsPipelining()) {
655 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it");
656 _maxPipelineLength = 1;
659 if (chunkedTransfer) {
660 setState(STATE_GETTING_CHUNKED);
661 } else if (noMessageBody || (bodyTransferSize == 0)) {
662 // force the state to GETTING_BODY, to simplify logic in
663 // responseComplete and handleClose
664 setState(STATE_GETTING_BODY);
667 setByteCount(bodyTransferSize); // may be -1, that's fine
668 setState(STATE_GETTING_BODY);
672 void responseComplete()
674 Request_ptr completedRequest = activeRequest;
675 _contentDecoder.finish();
677 assert(sentRequests.front() == activeRequest);
678 sentRequests.pop_front();
679 bool doClose = activeRequest->closeAfterComplete();
680 activeRequest = NULL;
682 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
684 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
685 // this will bring us into handleClose() above, which updates
686 // state to STATE_CLOSED
689 // if we have additional requests waiting, try to start them now
690 tryStartNextRequest();
694 if (state != STATE_CLOSED) {
695 setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
698 // notify request after we change state, so this connection is idle
699 // if completion triggers other requests (which is likely)
700 completedRequest->responseComplete();
701 client->requestFinished(this);
703 setTerminator("\r\n");
707 Request_ptr activeRequest;
708 ConnectionState state;
712 int bodyTransferSize;
713 SGTimeStamp idleTime;
714 bool chunkedTransfer;
717 RequestList queuedRequests;
718 RequestList sentRequests;
720 ContentDecoder _contentDecoder;
721 std::string _connectionId;
722 unsigned int _maxPipelineLength;
724 #endif // of !ENABLE_CURL
730 d->maxConnections = 4;
731 d->maxHostConnections = 4;
732 d->bytesTransferred = 0;
733 d->lastTransferRate = 0;
734 d->timeTransferSample.stamp();
735 d->totalBytesDownloaded = 0;
736 d->maxPipelineDepth = 5;
737 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
738 #if defined(ENABLE_CURL)
739 static bool didInitCurlGlobal = false;
740 if (!didInitCurlGlobal) {
741 curl_global_init(CURL_GLOBAL_ALL);
742 didInitCurlGlobal = true;
745 d->createCurlMulti();
751 #if defined(ENABLE_CURL)
752 curl_multi_cleanup(d->curlMulti);
756 void Client::setMaxConnections(unsigned int maxCon)
758 d->maxConnections = maxCon;
759 #if defined(ENABLE_CURL)
760 curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
764 void Client::setMaxHostConnections(unsigned int maxHostCon)
766 d->maxHostConnections = maxHostCon;
767 #if defined(ENABLE_CURL)
768 curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
772 void Client::setMaxPipelineDepth(unsigned int depth)
774 d->maxPipelineDepth = depth;
775 #if defined(ENABLE_CURL)
776 curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
778 ConnectionDict::iterator it = d->connections.begin();
779 for (; it != d->connections.end(); ) {
780 it->second->setMaxPipelineLength(depth);
785 void Client::update(int waitTimeout)
787 #if defined(ENABLE_CURL)
788 int remainingActive, messagesInQueue;
789 curl_multi_perform(d->curlMulti, &remainingActive);
790 d->haveActiveRequests = (remainingActive > 0);
793 while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
794 if (msg->msg == CURLMSG_DONE) {
796 CURL *e = msg->easy_handle;
797 curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
800 curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
802 if (msg->data.result == 0) {
803 req->responseComplete();
805 fprintf(stderr, "Result: %d - %s\n",
806 msg->data.result, curl_easy_strerror(msg->data.result));
807 req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
810 curl_multi_remove_handle(d->curlMulti, e);
812 // balance the reference we take in makeRequest
813 SGReferenced::put(req);
814 curl_easy_cleanup(e);
817 SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
819 } // of curl message processing loop
821 if (!d->poller.hasChannels() && (waitTimeout > 0)) {
822 SGTimeStamp::sleepForMSec(waitTimeout);
824 d->poller.poll(waitTimeout);
827 bool waitingRequests = !d->pendingRequests.empty();
828 ConnectionDict::iterator it = d->connections.begin();
829 for (; it != d->connections.end(); ) {
830 Connection* con = it->second;
831 if (con->hasIdleTimeout() ||
833 con->hasErrorTimeout() ||
834 (!con->isActive() && waitingRequests))
836 if (con->hasErrorTimeout()) {
837 // tell the connection we're timing it out
838 con->handleTimeout();
841 // connection has been idle for a while, clean it up
842 // (or if we have requests waiting for a different host,
843 // or an error condition
844 ConnectionDict::iterator del = it++;
846 d->connections.erase(del);
848 if (it->second->shouldStartNext()) {
849 it->second->tryStartNextRequest();
853 } // of connection iteration
855 if (waitingRequests && (d->connections.size() < d->maxConnections)) {
856 RequestList waiting(d->pendingRequests);
857 d->pendingRequests.clear();
859 // re-submit all waiting requests in order; this takes care of
860 // finding multiple pending items targetted to the same (new)
862 BOOST_FOREACH(Request_ptr req, waiting) {
869 void Client::makeRequest(const Request_ptr& r)
871 if( r->isComplete() )
874 if( r->url().find("://") == std::string::npos ) {
875 r->setFailure(EINVAL, "malformed URL");
881 #if defined(ENABLE_CURL)
882 CURL* curlRequest = curl_easy_init();
883 curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
885 // manually increase the ref count of the request
886 SGReferenced::get(r.get());
887 curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
888 // disable built-in libCurl progress feedback
889 curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
891 curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
892 curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
893 curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
894 curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
896 curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
897 curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
899 if (!d->proxy.empty()) {
900 curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
901 curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
903 if (!d->proxyAuth.empty()) {
904 curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
905 curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
909 std::string method = boost::to_lower_copy(r->method());
910 if (method == "get") {
911 curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
912 } else if (method == "put") {
913 curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
914 curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
915 } else if (method == "post") {
916 // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
917 curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
919 std::string q = r->query().substr(1);
920 curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
922 // reset URL to exclude query pieces
923 std::string urlWithoutQuery = r->url();
924 std::string::size_type queryPos = urlWithoutQuery.find('?');
925 urlWithoutQuery.resize(queryPos);
926 curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
928 curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
931 struct curl_slist* headerList = NULL;
932 if (r->hasBodyData() && (method != "post")) {
933 curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
934 curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
935 curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
936 curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
937 std::string h = "Content-Type:" + r->bodyType();
938 headerList = curl_slist_append(headerList, h.c_str());
941 StringMap::const_iterator it;
942 for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
943 std::string h = it->first + ": " + it->second;
944 headerList = curl_slist_append(headerList, h.c_str());
947 if (headerList != NULL) {
948 curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
951 curl_multi_add_handle(d->curlMulti, curlRequest);
952 d->haveActiveRequests = true;
954 // FIXME - premature?
958 if( r->url().find("http://") != 0 ) {
959 r->setFailure(EINVAL, "only HTTP protocol is supported");
963 std::string host = r->host();
964 int port = r->port();
965 if (!d->proxy.empty()) {
970 Connection* con = NULL;
971 std::stringstream ss;
972 ss << host << "-" << port;
973 std::string connectionId = ss.str();
974 bool havePending = !d->pendingRequests.empty();
975 bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
976 ConnectionDict::iterator consEnd = d->connections.end();
978 // assign request to an existing Connection.
979 // various options exist here, examined in order
980 ConnectionDict::iterator it = d->connections.find(connectionId);
981 if (atConnectionsLimit && (it == consEnd)) {
982 // maximum number of connections active, queue this request
983 // when a connection goes inactive, we'll start this one
984 d->pendingRequests.push_back(r);
988 // scan for an idle Connection to the same host (likely if we're
989 // retrieving multiple resources from the same host in quick succession)
990 // if we have pending requests (waiting for a free Connection), then
991 // force new requests on this id to always use the first Connection
992 // (instead of the random selection below). This ensures that when
993 // there's pressure on the number of connections to keep alive, one
994 // host can't DoS every other.
996 for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
997 if (havePending || !it->second->isActive()) {
1003 bool atHostConnectionsLimit = (count >= d->maxHostConnections);
1005 if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
1006 // all current connections are busy (active), and we don't
1007 // have free connections to allocate, so let's assign to
1008 // an existing one randomly. Ideally we'd used whichever one will
1009 // complete first but we don't have that info.
1010 int index = rand() % count;
1011 for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
1015 // allocate a new connection object
1017 static int connectionSuffx = 0;
1019 std::stringstream ss;
1020 ss << connectionId << "-" << connectionSuffx++;
1022 SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
1023 con = new Connection(this, ss.str());
1024 con->setServer(host, port);
1025 con->setMaxPipelineLength(d->maxPipelineDepth);
1026 d->poller.addChannel(con);
1027 d->connections.insert(d->connections.end(),
1028 ConnectionDict::value_type(connectionId, con));
1031 SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
1032 con->queueRequest(r);
1036 //------------------------------------------------------------------------------
1037 FileRequestRef Client::save( const std::string& url,
1038 const std::string& filename )
1040 FileRequestRef req = new FileRequest(url, filename);
1045 //------------------------------------------------------------------------------
1046 MemoryRequestRef Client::load(const std::string& url)
1048 MemoryRequestRef req = new MemoryRequest(url);
1053 void Client::requestFinished(Connection* con)
1058 void Client::setUserAgent(const std::string& ua)
1063 const std::string& Client::userAgent() const
1065 return d->userAgent;
1068 const std::string& Client::proxyHost() const
1073 const std::string& Client::proxyAuth() const
1075 return d->proxyAuth;
1078 void Client::setProxy( const std::string& proxy,
1080 const std::string& auth )
1083 d->proxyPort = port;
1084 d->proxyAuth = auth;
1087 bool Client::hasActiveRequests() const
1089 #if defined(ENABLE_CURL)
1090 return d->haveActiveRequests;
1092 ConnectionDict::const_iterator it = d->connections.begin();
1093 for (; it != d->connections.end(); ++it) {
1094 if (it->second->isActive()) return true;
1101 void Client::receivedBytes(unsigned int count)
1103 d->bytesTransferred += count;
1104 d->totalBytesDownloaded += count;
1107 unsigned int Client::transferRateBytesPerSec() const
1109 unsigned int e = d->timeTransferSample.elapsedMSec();
1111 // too long a window, ignore
1112 d->timeTransferSample.stamp();
1113 d->bytesTransferred = 0;
1114 d->lastTransferRate = 0;
1118 if (e < 100) { // avoid really narrow windows
1119 return d->lastTransferRate;
1122 unsigned int ratio = (d->bytesTransferred * 1000) / e;
1123 // run a low-pass filter
1124 unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1127 d->timeTransferSample.stamp();
1128 d->bytesTransferred = 0;
1129 d->lastTransferRate = smoothed;
1133 uint64_t Client::totalBytesDownloaded() const
1135 return d->totalBytesDownloaded;
1138 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1140 size_t byteSize = size * nmemb;
1141 Request* req = static_cast<Request*>(userdata);
1142 req->processBodyBytes(ptr, byteSize);
1144 Client* cl = req->http();
1146 cl->receivedBytes(byteSize);
1152 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1154 size_t maxBytes = size * nmemb;
1155 Request* req = static_cast<Request*>(userdata);
1156 size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1160 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1162 size_t byteSize = size * nitems;
1163 Request* req = static_cast<Request*>(userdata);
1164 std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1166 if (req->readyState() == HTTP::Request::OPENED) {
1167 req->responseStart(h);
1172 // got a 100-continue reponse; restart
1173 if (req->responseCode() == 100) {
1174 req->setReadyState(HTTP::Request::OPENED);
1178 req->responseHeadersComplete();
1182 if (req->responseCode() == 100) {
1183 return byteSize; // skip headers associated with 100-continue status
1186 size_t colonPos = h.find(':');
1187 if (colonPos == std::string::npos) {
1188 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1192 std::string key = strutils::simplify(h.substr(0, colonPos));
1193 std::string lkey = boost::to_lower_copy(key);
1194 std::string value = strutils::strip(h.substr(colonPos + 1));
1196 req->responseHeader(lkey, value);
1200 void Client::debugDumpRequests()
1202 #if defined(ENABLE_CURL)
1205 SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
1206 ConnectionDict::iterator it = d->connections.begin();
1207 for (; it != d->connections.end(); ++it) {
1208 it->second->debugDumpRequests();
1210 SG_LOG(SG_IO, SG_INFO, "==");
1214 void Client::clearAllConnections()
1216 #if defined(ENABLE_CURL)
1217 curl_multi_cleanup(d->curlMulti);
1218 d->createCurlMulti();
1220 ConnectionDict::iterator it = d->connections.begin();
1221 for (; it != d->connections.end(); ++it) {
1224 d->connections.clear();
1228 } // of namespace HTTP
1230 } // of namespace simgear