2 * \file HTTPClient.cxx - simple HTTP client engine for SimHear
5 // Written by James Turner
7 // Copyright (C) 2013 James Turner <zakalawe@mac.com>
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Library General Public License for more details.
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
24 #include "HTTPClient.hxx"
25 #include "HTTPFileRequest.hxx"
29 #include <cstdlib> // rand()
35 #include <boost/foreach.hpp>
36 #include <boost/algorithm/string/case_conv.hpp>
38 #include <simgear/io/sg_netChat.hxx>
39 #include <simgear/io/HTTPContentDecode.hxx>
40 #include <simgear/misc/strutils.hxx>
41 #include <simgear/compiler.h>
42 #include <simgear/debug/logstream.hxx>
43 #include <simgear/timing/timestamp.hxx>
44 #include <simgear/structure/exception.hxx>
46 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
49 # if !defined(SIMGEAR_VERSION)
50 # define SIMGEAR_VERSION "simgear-development"
60 extern const int DEFAULT_HTTP_PORT = 80;
61 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
62 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
65 typedef std::multimap<std::string, Connection*> ConnectionDict;
66 typedef std::list<Request_ptr> RequestList;
68 class Client::ClientPrivate
71 std::string userAgent;
74 std::string proxyAuth;
75 NetChannelPoller poller;
76 unsigned int maxConnections;
78 RequestList pendingRequests;
80 // connections by host (potentially more than one)
81 ConnectionDict connections;
83 SGTimeStamp timeTransferSample;
84 unsigned int bytesTransferred;
85 unsigned int lastTransferRate;
86 uint64_t totalBytesDownloaded;
89 class Connection : public NetChat
92 Connection(Client* pr) :
95 port(DEFAULT_HTTP_PORT)
103 virtual void handleBufferRead (NetBuffer& buffer)
105 if( !activeRequest || !activeRequest->isComplete() )
106 return NetChat::handleBufferRead(buffer);
108 // Request should be aborted (signaled by setting its state to complete).
110 // force the state to GETTING_BODY, to simplify logic in
111 // responseComplete and handleClose
112 state = STATE_GETTING_BODY;
116 void setServer(const std::string& h, short p)
122 // socket-level errors
123 virtual void handleError(int error)
125 const char* errStr = strerror(error);
128 // connection level failure, eg name lookup or routing
129 // we won't have an active request yet, so let's fail all of the
130 // requests since we presume it's a systematic failure for
131 // the host in question
132 BOOST_FOREACH(Request_ptr req, sentRequests) {
133 req->setFailure(error, errStr);
136 BOOST_FOREACH(Request_ptr req, queuedRequests) {
137 req->setFailure(error, errStr);
140 sentRequests.clear();
141 queuedRequests.clear();
144 NetChat::handleError(error);
146 activeRequest->setFailure(error, errStr);
147 activeRequest = NULL;
148 _contentDecoder.reset();
151 state = STATE_SOCKET_ERROR;
154 virtual void handleClose()
156 NetChat::handleClose();
158 // closing of the connection from the server side when getting the body,
159 bool canCloseState = (state == STATE_GETTING_BODY);
160 if (canCloseState && activeRequest) {
161 // force state here, so responseComplete can avoid closing the
163 state = STATE_CLOSED;
167 activeRequest->setFailure(500, "server closed connection");
168 // remove the failed request from sentRequests, so it does
170 RequestList::iterator it = std::find(sentRequests.begin(),
171 sentRequests.end(), activeRequest);
172 if (it != sentRequests.end()) {
173 sentRequests.erase(it);
175 activeRequest = NULL;
176 _contentDecoder.reset();
179 state = STATE_CLOSED;
182 if (sentRequests.empty()) {
186 // restore sent requests to the queue, so they will be re-sent
187 // when the connection opens again
188 queuedRequests.insert(queuedRequests.begin(),
189 sentRequests.begin(), sentRequests.end());
190 sentRequests.clear();
195 NetChat::handleError(ETIMEDOUT);
197 SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
198 activeRequest->setFailure(ETIMEDOUT, "socket timeout");
199 activeRequest = NULL;
200 _contentDecoder.reset();
203 state = STATE_SOCKET_ERROR;
206 void queueRequest(const Request_ptr& r)
208 queuedRequests.push_back(r);
209 tryStartNextRequest();
214 assert(!sentRequests.empty());
215 assert(state == STATE_WAITING_FOR_RESPONSE);
217 activeRequest = sentRequests.front();
219 activeRequest->responseStart(buffer);
220 } catch (sg_exception& e) {
225 state = STATE_GETTING_HEADERS;
227 if (activeRequest->responseCode() == 204) {
228 noMessageBody = true;
229 } else if (activeRequest->method() == "HEAD") {
230 noMessageBody = true;
232 noMessageBody = false;
235 bodyTransferSize = -1;
236 chunkedTransfer = false;
237 _contentDecoder.reset();
240 void tryStartNextRequest()
242 while( !queuedRequests.empty()
243 && queuedRequests.front()->isComplete() )
244 queuedRequests.pop_front();
246 if (queuedRequests.empty()) {
251 if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
255 if (state == STATE_CLOSED) {
256 if (!connectToHost()) {
261 setTerminator("\r\n");
265 Request_ptr r = queuedRequests.front();
268 std::stringstream headerData;
269 std::string path = r->path();
270 assert(!path.empty());
271 std::string query = r->query();
272 std::string bodyData;
274 if (!client->proxyHost().empty()) {
275 path = r->scheme() + "://" + r->host() + r->path();
278 if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
279 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
280 bodyData = query.substr(1); // URL-encode, drop the leading '?'
281 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
282 headerData << "Content-Length:" << bodyData.size() << "\r\n";
284 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
285 if( r->hasBodyData() )
287 headerData << "Content-Length:" << r->bodyLength() << "\r\n";
288 headerData << "Content-Type:" << r->bodyType() << "\r\n";
292 headerData << "Host: " << r->hostAndPort() << "\r\n";
293 headerData << "User-Agent:" << client->userAgent() << "\r\n";
294 headerData << "Accept-Encoding: deflate, gzip\r\n";
295 if (!client->proxyAuth().empty()) {
296 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
299 BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
300 headerData << h.first << ": " << h.second << "\r\n";
303 headerData << "\r\n"; // final CRLF to terminate the headers
304 if (!bodyData.empty()) {
305 headerData << bodyData;
308 bool ok = push(headerData.str().c_str());
310 SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
311 // we've over-stuffed the socket, give up for now, let things
312 // drain down before trying to start any more requests.
316 if( r->hasBodyData() )
317 for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
320 size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
323 if( !bufferSend(buf, len) )
327 "overflow the HTTP::Connection output buffer");
328 state = STATE_SOCKET_ERROR;
331 body_bytes_sent += len;
337 "HTTP asynchronous request body generation is unsupported");
342 // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
343 // "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
344 // "\n\t on connection " << this);
345 // successfully sent, remove from queue, and maybe send the next
346 queuedRequests.pop_front();
347 sentRequests.push_back(r);
348 state = STATE_WAITING_FOR_RESPONSE;
350 // pipelining, let's maybe send the next request right away
351 tryStartNextRequest();
354 virtual void collectIncomingData(const char* s, int n)
357 client->receivedBytes(static_cast<unsigned int>(n));
359 if( (state == STATE_GETTING_BODY)
360 || (state == STATE_GETTING_CHUNKED_BYTES) )
361 _contentDecoder.receivedBytes(s, n);
366 virtual void foundTerminator(void)
370 case STATE_WAITING_FOR_RESPONSE:
374 case STATE_GETTING_HEADERS:
379 case STATE_GETTING_BODY:
383 case STATE_GETTING_CHUNKED:
384 processChunkHeader();
387 case STATE_GETTING_CHUNKED_BYTES:
388 setTerminator("\r\n");
389 state = STATE_GETTING_CHUNKED;
394 case STATE_GETTING_TRAILER:
400 SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
407 bool hasIdleTimeout() const
409 if (state != STATE_IDLE) {
413 assert(sentRequests.empty());
414 return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
417 bool hasErrorTimeout() const
419 if (state == STATE_IDLE) {
423 return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
426 bool hasError() const
428 return (state == STATE_SOCKET_ERROR);
431 bool shouldStartNext() const
433 return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
436 bool isActive() const
438 return !queuedRequests.empty() || !sentRequests.empty();
443 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
446 SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
450 if (connect(host.c_str(), port) != 0) {
460 std::string h = strutils::simplify(buffer);
461 if (h.empty()) { // blank line terminates headers
466 int colonPos = buffer.find(':');
468 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
472 std::string key = strutils::simplify(buffer.substr(0, colonPos));
473 std::string lkey = boost::to_lower_copy(key);
474 std::string value = strutils::strip(buffer.substr(colonPos + 1));
476 // only consider these if getting headers (as opposed to trailers
477 // of a chunked transfer)
478 if (state == STATE_GETTING_HEADERS) {
479 if (lkey == "content-length") {
481 int sz = strutils::to_int(value);
482 if (bodyTransferSize <= 0) {
483 bodyTransferSize = sz;
485 activeRequest->setResponseLength(sz);
486 } else if (lkey == "transfer-length") {
487 bodyTransferSize = strutils::to_int(value);
488 } else if (lkey == "transfer-encoding") {
489 processTransferEncoding(value);
490 } else if (lkey == "content-encoding") {
491 _contentDecoder.setEncoding(value);
495 activeRequest->responseHeader(lkey, value);
498 void processTransferEncoding(const std::string& te)
500 if (te == "chunked") {
501 chunkedTransfer = true;
503 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
508 void processChunkHeader()
510 if (buffer.empty()) {
511 // blank line after chunk data
516 int semiPos = buffer.find(';');
518 // extensions ignored for the moment
519 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
521 chunkSize = strutils::to_int(buffer, 16);
525 if (chunkSize == 0) { // trailer start
526 state = STATE_GETTING_TRAILER;
530 state = STATE_GETTING_CHUNKED_BYTES;
531 setByteCount(chunkSize);
534 void processTrailer()
536 if (buffer.empty()) {
542 // process as a normal header
546 void headersComplete()
548 activeRequest->responseHeadersComplete();
549 _contentDecoder.initWithRequest(activeRequest);
551 if (chunkedTransfer) {
552 state = STATE_GETTING_CHUNKED;
553 } else if (noMessageBody || (bodyTransferSize == 0)) {
554 // force the state to GETTING_BODY, to simplify logic in
555 // responseComplete and handleClose
556 state = STATE_GETTING_BODY;
559 setByteCount(bodyTransferSize); // may be -1, that's fine
560 state = STATE_GETTING_BODY;
564 void responseComplete()
566 Request_ptr completedRequest = activeRequest;
567 _contentDecoder.finish();
569 assert(sentRequests.front() == activeRequest);
570 sentRequests.pop_front();
571 bool doClose = activeRequest->closeAfterComplete();
572 activeRequest = NULL;
574 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
576 // this will bring us into handleClose() above, which updates
577 // state to STATE_CLOSED
580 // if we have additional requests waiting, try to start them now
581 tryStartNextRequest();
585 if (state != STATE_CLOSED) {
586 state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
589 // notify request after we change state, so this connection is idle
590 // if completion triggers other requests (which is likely)
591 // SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
592 completedRequest->responseComplete();
593 client->requestFinished(this);
595 setTerminator("\r\n");
598 enum ConnectionState {
600 STATE_WAITING_FOR_RESPONSE,
601 STATE_GETTING_HEADERS,
603 STATE_GETTING_CHUNKED,
604 STATE_GETTING_CHUNKED_BYTES,
605 STATE_GETTING_TRAILER,
607 STATE_CLOSED ///< connection should be closed now
611 Request_ptr activeRequest;
612 ConnectionState state;
616 int bodyTransferSize;
617 SGTimeStamp idleTime;
618 bool chunkedTransfer;
621 RequestList queuedRequests;
622 RequestList sentRequests;
624 ContentDecoder _contentDecoder;
631 d->maxConnections = 4;
632 d->bytesTransferred = 0;
633 d->lastTransferRate = 0;
634 d->timeTransferSample.stamp();
635 d->totalBytesDownloaded = 0;
637 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
644 void Client::setMaxConnections(unsigned int maxCon)
647 throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
650 d->maxConnections = maxCon;
653 void Client::update(int waitTimeout)
655 if (!d->poller.hasChannels() && (waitTimeout > 0)) {
656 SGTimeStamp::sleepForMSec(waitTimeout);
658 d->poller.poll(waitTimeout);
661 bool waitingRequests = !d->pendingRequests.empty();
662 ConnectionDict::iterator it = d->connections.begin();
663 for (; it != d->connections.end(); ) {
664 Connection* con = it->second;
665 if (con->hasIdleTimeout() ||
667 con->hasErrorTimeout() ||
668 (!con->isActive() && waitingRequests))
670 if (con->hasErrorTimeout()) {
671 // tell the connection we're timing it out
672 con->handleTimeout();
675 // connection has been idle for a while, clean it up
676 // (or if we have requests waiting for a different host,
677 // or an error condition
678 ConnectionDict::iterator del = it++;
680 d->connections.erase(del);
682 if (it->second->shouldStartNext()) {
683 it->second->tryStartNextRequest();
687 } // of connection iteration
689 if (waitingRequests && (d->connections.size() < d->maxConnections)) {
690 RequestList waiting(d->pendingRequests);
691 d->pendingRequests.clear();
693 // re-submit all waiting requests in order; this takes care of
694 // finding multiple pending items targetted to the same (new)
696 BOOST_FOREACH(Request_ptr req, waiting) {
702 void Client::makeRequest(const Request_ptr& r)
704 if( r->isComplete() )
707 if( r->url().find("://") == std::string::npos ) {
708 r->setFailure(EINVAL, "malformed URL");
712 if( r->url().find("http://") != 0 ) {
713 r->setFailure(EINVAL, "only HTTP protocol is supported");
717 std::string host = r->host();
718 int port = r->port();
719 if (!d->proxy.empty()) {
724 Connection* con = NULL;
725 std::stringstream ss;
726 ss << host << "-" << port;
727 std::string connectionId = ss.str();
728 bool havePending = !d->pendingRequests.empty();
729 bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
730 ConnectionDict::iterator consEnd = d->connections.end();
732 // assign request to an existing Connection.
733 // various options exist here, examined in order
734 ConnectionDict::iterator it = d->connections.find(connectionId);
735 if (atConnectionsLimit && (it == consEnd)) {
736 // maximum number of connections active, queue this request
737 // when a connection goes inactive, we'll start this one
738 d->pendingRequests.push_back(r);
742 // scan for an idle Connection to the same host (likely if we're
743 // retrieving multiple resources from the same host in quick succession)
744 // if we have pending requests (waiting for a free Connection), then
745 // force new requests on this id to always use the first Connection
746 // (instead of the random selection below). This ensures that when
747 // there's pressure on the number of connections to keep alive, one
748 // host can't DoS every other.
750 for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
751 if (havePending || !it->second->isActive()) {
757 if (!con && atConnectionsLimit) {
758 // all current connections are busy (active), and we don't
759 // have free connections to allocate, so let's assign to
760 // an existing one randomly. Ideally we'd used whichever one will
761 // complete first but we don't have that info.
762 int index = rand() % count;
763 for (it = d->connections.find(connectionId); index > 0; --index) { ; }
767 // allocate a new connection object
769 con = new Connection(this);
770 con->setServer(host, port);
771 d->poller.addChannel(con);
772 d->connections.insert(d->connections.end(),
773 ConnectionDict::value_type(connectionId, con));
776 con->queueRequest(r);
779 //------------------------------------------------------------------------------
780 FileRequestRef Client::save( const std::string& url,
781 const std::string& filename )
783 FileRequestRef req = new FileRequest(url, filename);
788 //------------------------------------------------------------------------------
789 MemoryRequestRef Client::load(const std::string& url)
791 MemoryRequestRef req = new MemoryRequest(url);
796 void Client::requestFinished(Connection* con)
801 void Client::setUserAgent(const std::string& ua)
806 const std::string& Client::userAgent() const
811 const std::string& Client::proxyHost() const
816 const std::string& Client::proxyAuth() const
821 void Client::setProxy( const std::string& proxy,
823 const std::string& auth )
830 bool Client::hasActiveRequests() const
832 ConnectionDict::const_iterator it = d->connections.begin();
833 for (; it != d->connections.end(); ++it) {
834 if (it->second->isActive()) return true;
840 void Client::receivedBytes(unsigned int count)
842 d->bytesTransferred += count;
843 d->totalBytesDownloaded += count;
846 unsigned int Client::transferRateBytesPerSec() const
848 unsigned int e = d->timeTransferSample.elapsedMSec();
850 // too long a window, ignore
851 d->timeTransferSample.stamp();
852 d->bytesTransferred = 0;
853 d->lastTransferRate = 0;
857 if (e < 100) { // avoid really narrow windows
858 return d->lastTransferRate;
861 unsigned int ratio = (d->bytesTransferred * 1000) / e;
862 // run a low-pass filter
863 unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
866 d->timeTransferSample.stamp();
867 d->bytesTransferred = 0;
868 d->lastTransferRate = smoothed;
872 uint64_t Client::totalBytesDownloaded() const
874 return d->totalBytesDownloaded;
877 } // of namespace HTTP
879 } // of namespace simgear