2 * \file HTTPClient.cxx - simple HTTP client engine for SimHear
5 // Written by James Turner
7 // Copyright (C) 2013 James Turner <zakalawe@mac.com>
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Library General Public License for more details.
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
24 #include "HTTPClient.hxx"
25 #include "HTTPFileRequest.hxx"
29 #include <cstdlib> // rand()
35 #include <boost/foreach.hpp>
36 #include <boost/algorithm/string/case_conv.hpp>
38 #include <simgear/io/sg_netChat.hxx>
39 #include <simgear/io/HTTPContentDecode.hxx>
40 #include <simgear/misc/strutils.hxx>
41 #include <simgear/compiler.h>
42 #include <simgear/debug/logstream.hxx>
43 #include <simgear/timing/timestamp.hxx>
44 #include <simgear/structure/exception.hxx>
46 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
49 # if !defined(SIMGEAR_VERSION)
50 # define SIMGEAR_VERSION "simgear-development"
60 extern const int DEFAULT_HTTP_PORT = 80;
61 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
62 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
65 typedef std::multimap<std::string, Connection*> ConnectionDict;
66 typedef std::list<Request_ptr> RequestList;
68 static bool isFailureStatus(int httpStatus)
70 int majorCode = httpStatus / 100;
71 return (majorCode != 2);
74 class Client::ClientPrivate
77 std::string userAgent;
80 std::string proxyAuth;
81 NetChannelPoller poller;
82 unsigned int maxConnections;
84 RequestList pendingRequests;
86 // connections by host (potentially more than one)
87 ConnectionDict connections;
89 SGTimeStamp timeTransferSample;
90 unsigned int bytesTransferred;
91 unsigned int lastTransferRate;
92 uint64_t totalBytesDownloaded;
95 class Connection : public NetChat
98 Connection(Client* pr) :
101 port(DEFAULT_HTTP_PORT)
105 virtual ~Connection()
109 virtual void handleBufferRead (NetBuffer& buffer)
111 if( !activeRequest || !activeRequest->isComplete() )
112 return NetChat::handleBufferRead(buffer);
114 // Request should be aborted (signaled by setting its state to complete).
116 // force the state to GETTING_BODY, to simplify logic in
117 // responseComplete and handleClose
118 state = STATE_GETTING_BODY;
122 void setServer(const std::string& h, short p)
128 // socket-level errors
129 virtual void handleError(int error)
131 if (error == ENOENT) {
132 // name lookup failure
133 // we won't have an active request yet, so the logic below won't
134 // fire to actually call setFailure. Let's fail all of the requests
135 BOOST_FOREACH(Request_ptr req, sentRequests) {
136 req->setFailure(error, "hostname lookup failure");
139 BOOST_FOREACH(Request_ptr req, queuedRequests) {
140 req->setFailure(error, "hostname lookup failure");
143 // name lookup failure, abandon all requests on this connection
144 sentRequests.clear();
145 queuedRequests.clear();
148 NetChat::handleError(error);
150 SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
151 activeRequest->setFailure(error, "socket error");
152 activeRequest = NULL;
153 _contentDecoder.reset();
156 state = STATE_SOCKET_ERROR;
159 virtual void handleClose()
161 NetChat::handleClose();
163 // closing of the connection from the server side when getting the body,
164 bool canCloseState = (state == STATE_GETTING_BODY);
165 if (canCloseState && activeRequest) {
166 // force state here, so responseComplete can avoid closing the
168 state = STATE_CLOSED;
172 activeRequest->setFailure(500, "server closed connection");
173 // remove the failed request from sentRequests, so it does
175 RequestList::iterator it = std::find(sentRequests.begin(),
176 sentRequests.end(), activeRequest);
177 if (it != sentRequests.end()) {
178 sentRequests.erase(it);
180 activeRequest = NULL;
181 _contentDecoder.reset();
184 state = STATE_CLOSED;
187 if (sentRequests.empty()) {
191 // restore sent requests to the queue, so they will be re-sent
192 // when the connection opens again
193 queuedRequests.insert(queuedRequests.begin(),
194 sentRequests.begin(), sentRequests.end());
195 sentRequests.clear();
200 NetChat::handleError(ETIMEDOUT);
202 SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
203 activeRequest->setFailure(ETIMEDOUT, "socket timeout");
204 activeRequest = NULL;
205 _contentDecoder.reset();
208 state = STATE_SOCKET_ERROR;
211 void queueRequest(const Request_ptr& r)
213 queuedRequests.push_back(r);
214 tryStartNextRequest();
219 assert(!sentRequests.empty());
220 assert(state == STATE_WAITING_FOR_RESPONSE);
222 activeRequest = sentRequests.front();
223 activeRequest->responseStart(buffer);
224 if (isFailureStatus(activeRequest->responseCode())) {
229 state = STATE_GETTING_HEADERS;
231 if (activeRequest->responseCode() == 204) {
232 noMessageBody = true;
233 } else if (activeRequest->method() == "HEAD") {
234 noMessageBody = true;
236 noMessageBody = false;
239 bodyTransferSize = -1;
240 chunkedTransfer = false;
241 _contentDecoder.reset();
244 void tryStartNextRequest()
246 while( !queuedRequests.empty()
247 && queuedRequests.front()->isComplete() )
248 queuedRequests.pop_front();
250 if (queuedRequests.empty()) {
255 if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
259 if (state == STATE_CLOSED) {
260 if (!connectToHost()) {
264 setTerminator("\r\n");
268 Request_ptr r = queuedRequests.front();
271 std::stringstream headerData;
272 std::string path = r->path();
273 assert(!path.empty());
274 std::string query = r->query();
275 std::string bodyData;
277 if (!client->proxyHost().empty()) {
278 path = r->scheme() + "://" + r->host() + r->path();
281 if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
282 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
283 bodyData = query.substr(1); // URL-encode, drop the leading '?'
284 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
285 headerData << "Content-Length:" << bodyData.size() << "\r\n";
287 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
288 if( r->hasBodyData() )
290 headerData << "Content-Length:" << r->bodyLength() << "\r\n";
291 headerData << "Content-Type:" << r->bodyType() << "\r\n";
295 headerData << "Host: " << r->hostAndPort() << "\r\n";
296 headerData << "User-Agent:" << client->userAgent() << "\r\n";
297 headerData << "Accept-Encoding: deflate, gzip\r\n";
298 if (!client->proxyAuth().empty()) {
299 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
302 BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
303 headerData << h.first << ": " << h.second << "\r\n";
306 headerData << "\r\n"; // final CRLF to terminate the headers
307 if (!bodyData.empty()) {
308 headerData << bodyData;
311 bool ok = push(headerData.str().c_str());
313 SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
314 // we've over-stuffed the socket, give up for now, let things
315 // drain down before trying to start any more requests.
319 if( r->hasBodyData() )
320 for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
323 size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
326 if( !bufferSend(buf, len) )
330 "overflow the HTTP::Connection output buffer");
331 state = STATE_SOCKET_ERROR;
334 body_bytes_sent += len;
340 "HTTP asynchronous request body generation is unsupported");
345 // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
346 // "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
347 // "\n\t on connection " << this);
348 // successfully sent, remove from queue, and maybe send the next
349 queuedRequests.pop_front();
350 sentRequests.push_back(r);
351 state = STATE_WAITING_FOR_RESPONSE;
353 // pipelining, let's maybe send the next request right away
354 tryStartNextRequest();
357 virtual void collectIncomingData(const char* s, int n)
360 client->receivedBytes(static_cast<unsigned int>(n));
362 if( (state == STATE_GETTING_BODY)
363 || (state == STATE_GETTING_CHUNKED_BYTES) )
364 _contentDecoder.receivedBytes(s, n);
369 virtual void foundTerminator(void)
373 case STATE_WAITING_FOR_RESPONSE:
377 case STATE_GETTING_HEADERS:
382 case STATE_GETTING_BODY:
386 case STATE_GETTING_CHUNKED:
387 processChunkHeader();
390 case STATE_GETTING_CHUNKED_BYTES:
391 setTerminator("\r\n");
392 state = STATE_GETTING_CHUNKED;
397 case STATE_GETTING_TRAILER:
403 SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
410 bool hasIdleTimeout() const
412 if (state != STATE_IDLE) {
416 assert(sentRequests.empty());
417 return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
420 bool hasErrorTimeout() const
422 if (state == STATE_IDLE) {
426 return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
429 bool hasError() const
431 return (state == STATE_SOCKET_ERROR);
434 bool shouldStartNext() const
436 return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
439 bool isActive() const
441 return !queuedRequests.empty() || !sentRequests.empty();
446 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
449 SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
453 if (connect(host.c_str(), port) != 0) {
463 std::string h = strutils::simplify(buffer);
464 if (h.empty()) { // blank line terminates headers
469 int colonPos = buffer.find(':');
471 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
475 std::string key = strutils::simplify(buffer.substr(0, colonPos));
476 std::string lkey = boost::to_lower_copy(key);
477 std::string value = strutils::strip(buffer.substr(colonPos + 1));
479 // only consider these if getting headers (as opposed to trailers
480 // of a chunked transfer)
481 if (state == STATE_GETTING_HEADERS) {
482 if (lkey == "content-length") {
484 int sz = strutils::to_int(value);
485 if (bodyTransferSize <= 0) {
486 bodyTransferSize = sz;
488 activeRequest->setResponseLength(sz);
489 } else if (lkey == "transfer-length") {
490 bodyTransferSize = strutils::to_int(value);
491 } else if (lkey == "transfer-encoding") {
492 processTransferEncoding(value);
493 } else if (lkey == "content-encoding") {
494 _contentDecoder.setEncoding(value);
498 activeRequest->responseHeader(lkey, value);
501 void processTransferEncoding(const std::string& te)
503 if (te == "chunked") {
504 chunkedTransfer = true;
506 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
511 void processChunkHeader()
513 if (buffer.empty()) {
514 // blank line after chunk data
519 int semiPos = buffer.find(';');
521 // extensions ignored for the moment
522 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
524 chunkSize = strutils::to_int(buffer, 16);
528 if (chunkSize == 0) { // trailer start
529 state = STATE_GETTING_TRAILER;
533 state = STATE_GETTING_CHUNKED_BYTES;
534 setByteCount(chunkSize);
537 void processTrailer()
539 if (buffer.empty()) {
545 // process as a normal header
549 void headersComplete()
551 activeRequest->responseHeadersComplete();
552 _contentDecoder.initWithRequest(activeRequest);
554 if (chunkedTransfer) {
555 state = STATE_GETTING_CHUNKED;
556 } else if (noMessageBody || (bodyTransferSize == 0)) {
557 // force the state to GETTING_BODY, to simplify logic in
558 // responseComplete and handleClose
559 state = STATE_GETTING_BODY;
562 setByteCount(bodyTransferSize); // may be -1, that's fine
563 state = STATE_GETTING_BODY;
567 void responseComplete()
569 Request_ptr completedRequest = activeRequest;
570 _contentDecoder.finish();
572 assert(sentRequests.front() == activeRequest);
573 sentRequests.pop_front();
574 bool doClose = activeRequest->closeAfterComplete();
575 activeRequest = NULL;
577 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
579 // this will bring us into handleClose() above, which updates
580 // state to STATE_CLOSED
583 // if we have additional requests waiting, try to start them now
584 tryStartNextRequest();
588 if (state != STATE_CLOSED) {
589 state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
592 // notify request after we change state, so this connection is idle
593 // if completion triggers other requests (which is likely)
594 // SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
595 completedRequest->responseComplete();
596 client->requestFinished(this);
598 setTerminator("\r\n");
601 enum ConnectionState {
603 STATE_WAITING_FOR_RESPONSE,
604 STATE_GETTING_HEADERS,
606 STATE_GETTING_CHUNKED,
607 STATE_GETTING_CHUNKED_BYTES,
608 STATE_GETTING_TRAILER,
610 STATE_CLOSED ///< connection should be closed now
614 Request_ptr activeRequest;
615 ConnectionState state;
619 int bodyTransferSize;
620 SGTimeStamp idleTime;
621 bool chunkedTransfer;
624 RequestList queuedRequests;
625 RequestList sentRequests;
627 ContentDecoder _contentDecoder;
634 d->maxConnections = 4;
635 d->bytesTransferred = 0;
636 d->lastTransferRate = 0;
637 d->timeTransferSample.stamp();
638 d->totalBytesDownloaded = 0;
640 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
647 void Client::setMaxConnections(unsigned int maxCon)
650 throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
653 d->maxConnections = maxCon;
656 void Client::update(int waitTimeout)
658 if (!d->poller.hasChannels() && (waitTimeout > 0)) {
659 SGTimeStamp::sleepForMSec(waitTimeout);
661 d->poller.poll(waitTimeout);
664 bool waitingRequests = !d->pendingRequests.empty();
665 ConnectionDict::iterator it = d->connections.begin();
666 for (; it != d->connections.end(); ) {
667 Connection* con = it->second;
668 if (con->hasIdleTimeout() ||
670 con->hasErrorTimeout() ||
671 (!con->isActive() && waitingRequests))
673 if (con->hasErrorTimeout()) {
674 // tell the connection we're timing it out
675 con->handleTimeout();
678 // connection has been idle for a while, clean it up
679 // (or if we have requests waiting for a different host,
680 // or an error condition
681 ConnectionDict::iterator del = it++;
683 d->connections.erase(del);
685 if (it->second->shouldStartNext()) {
686 it->second->tryStartNextRequest();
690 } // of connection iteration
692 if (waitingRequests && (d->connections.size() < d->maxConnections)) {
693 RequestList waiting(d->pendingRequests);
694 d->pendingRequests.clear();
696 // re-submit all waiting requests in order; this takes care of
697 // finding multiple pending items targetted to the same (new)
699 BOOST_FOREACH(Request_ptr req, waiting) {
705 void Client::makeRequest(const Request_ptr& r)
707 if( r->isComplete() )
710 if( r->url().find("://") == std::string::npos ) {
711 r->setFailure(EINVAL, "malformed URL");
715 if( r->url().find("http://") != 0 ) {
716 r->setFailure(EINVAL, "only HTTP protocol is supported");
720 std::string host = r->host();
721 int port = r->port();
722 if (!d->proxy.empty()) {
727 Connection* con = NULL;
728 std::stringstream ss;
729 ss << host << "-" << port;
730 std::string connectionId = ss.str();
731 bool havePending = !d->pendingRequests.empty();
732 bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
733 ConnectionDict::iterator consEnd = d->connections.end();
735 // assign request to an existing Connection.
736 // various options exist here, examined in order
737 ConnectionDict::iterator it = d->connections.find(connectionId);
738 if (atConnectionsLimit && (it == consEnd)) {
739 // maximum number of connections active, queue this request
740 // when a connection goes inactive, we'll start this one
741 d->pendingRequests.push_back(r);
745 // scan for an idle Connection to the same host (likely if we're
746 // retrieving multiple resources from the same host in quick succession)
747 // if we have pending requests (waiting for a free Connection), then
748 // force new requests on this id to always use the first Connection
749 // (instead of the random selection below). This ensures that when
750 // there's pressure on the number of connections to keep alive, one
751 // host can't DoS every other.
753 for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
754 if (havePending || !it->second->isActive()) {
760 if (!con && atConnectionsLimit) {
761 // all current connections are busy (active), and we don't
762 // have free connections to allocate, so let's assign to
763 // an existing one randomly. Ideally we'd used whichever one will
764 // complete first but we don't have that info.
765 int index = rand() % count;
766 for (it = d->connections.find(connectionId); index > 0; --index) { ; }
770 // allocate a new connection object
772 con = new Connection(this);
773 con->setServer(host, port);
774 d->poller.addChannel(con);
775 d->connections.insert(d->connections.end(),
776 ConnectionDict::value_type(connectionId, con));
779 con->queueRequest(r);
782 //------------------------------------------------------------------------------
783 FileRequestRef Client::save( const std::string& url,
784 const std::string& filename )
786 FileRequestRef req = new FileRequest(url, filename);
791 //------------------------------------------------------------------------------
792 MemoryRequestRef Client::load(const std::string& url)
794 MemoryRequestRef req = new MemoryRequest(url);
799 void Client::requestFinished(Connection* con)
804 void Client::setUserAgent(const std::string& ua)
809 const std::string& Client::userAgent() const
814 const std::string& Client::proxyHost() const
819 const std::string& Client::proxyAuth() const
824 void Client::setProxy( const std::string& proxy,
826 const std::string& auth )
833 bool Client::hasActiveRequests() const
835 ConnectionDict::const_iterator it = d->connections.begin();
836 for (; it != d->connections.end(); ++it) {
837 if (it->second->isActive()) return true;
843 void Client::receivedBytes(unsigned int count)
845 d->bytesTransferred += count;
846 d->totalBytesDownloaded += count;
849 unsigned int Client::transferRateBytesPerSec() const
851 unsigned int e = d->timeTransferSample.elapsedMSec();
853 // too long a window, ignore
854 d->timeTransferSample.stamp();
855 d->bytesTransferred = 0;
856 d->lastTransferRate = 0;
860 if (e < 100) { // avoid really narrow windows
861 return d->lastTransferRate;
864 unsigned int ratio = (d->bytesTransferred * 1000) / e;
865 // run a low-pass filter
866 unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
869 d->timeTransferSample.stamp();
870 d->bytesTransferred = 0;
871 d->lastTransferRate = smoothed;
875 uint64_t Client::totalBytesDownloaded() const
877 return d->totalBytesDownloaded;
880 } // of namespace HTTP
882 } // of namespace simgear