2 * \file HTTPClient.cxx - simple HTTP client engine for SimHear
5 // Written by James Turner
7 // Copyright (C) 2013 James Turner <zakalawe@mac.com>
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Library General Public License for more details.
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
24 #include "HTTPClient.hxx"
28 #include <cstdlib> // rand()
35 #include <boost/foreach.hpp>
36 #include <boost/algorithm/string/case_conv.hpp>
38 #include <simgear/io/sg_netChat.hxx>
39 #include <simgear/io/HTTPContentDecode.hxx>
40 #include <simgear/misc/strutils.hxx>
41 #include <simgear/compiler.h>
42 #include <simgear/debug/logstream.hxx>
43 #include <simgear/timing/timestamp.hxx>
44 #include <simgear/structure/exception.hxx>
46 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
49 # if !defined(SIMGEAR_VERSION)
50 # define SIMGEAR_VERSION "simgear-development"
55 using std::stringstream;
64 extern const int DEFAULT_HTTP_PORT = 80;
65 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
66 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
69 typedef std::multimap<std::string, Connection*> ConnectionDict;
70 typedef std::list<Request_ptr> RequestList;
72 class Client::ClientPrivate
75 std::string userAgent;
78 std::string proxyAuth;
79 NetChannelPoller poller;
80 unsigned int maxConnections;
82 RequestList pendingRequests;
84 // connections by host (potentially more than one)
85 ConnectionDict connections;
87 SGTimeStamp timeTransferSample;
88 unsigned int bytesTransferred;
89 unsigned int lastTransferRate;
90 uint64_t totalBytesDownloaded;
93 class Connection : public NetChat
96 Connection(Client* pr) :
99 port(DEFAULT_HTTP_PORT)
103 virtual ~Connection()
107 void setServer(const string& h, short p)
113 // socket-level errors
114 virtual void handleError(int error)
116 if (error == ENOENT) {
117 // name lookup failure
118 // we won't have an active request yet, so the logic below won't
119 // fire to actually call setFailure. Let's fail all of the requests
120 BOOST_FOREACH(Request_ptr req, sentRequests) {
121 req->setFailure(error, "hostname lookup failure");
124 BOOST_FOREACH(Request_ptr req, queuedRequests) {
125 req->setFailure(error, "hostname lookup failure");
128 // name lookup failure, abandon all requests on this connection
129 sentRequests.clear();
130 queuedRequests.clear();
133 NetChat::handleError(error);
135 SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
136 activeRequest->setFailure(error, "socket error");
137 activeRequest = NULL;
138 _contentDecoder.reset();
141 state = STATE_SOCKET_ERROR;
144 virtual void handleClose()
146 NetChat::handleClose();
148 // closing of the connection from the server side when getting the body,
149 bool canCloseState = (state == STATE_GETTING_BODY);
150 if (canCloseState && activeRequest) {
151 // force state here, so responseComplete can avoid closing the
153 state = STATE_CLOSED;
157 activeRequest->setFailure(500, "server closed connection");
158 // remove the failed request from sentRequests, so it does
160 RequestList::iterator it = std::find(sentRequests.begin(),
161 sentRequests.end(), activeRequest);
162 if (it != sentRequests.end()) {
163 sentRequests.erase(it);
165 activeRequest = NULL;
166 _contentDecoder.reset();
169 state = STATE_CLOSED;
172 if (sentRequests.empty()) {
176 // restore sent requests to the queue, so they will be re-sent
177 // when the connection opens again
178 queuedRequests.insert(queuedRequests.begin(),
179 sentRequests.begin(), sentRequests.end());
180 sentRequests.clear();
185 NetChat::handleError(ETIMEDOUT);
187 SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
188 activeRequest->setFailure(ETIMEDOUT, "socket timeout");
189 activeRequest = NULL;
190 _contentDecoder.reset();
193 state = STATE_SOCKET_ERROR;
196 void queueRequest(const Request_ptr& r)
198 queuedRequests.push_back(r);
199 tryStartNextRequest();
204 assert(!sentRequests.empty());
205 assert(state == STATE_WAITING_FOR_RESPONSE);
207 activeRequest = sentRequests.front();
209 activeRequest->responseStart(buffer);
210 state = STATE_GETTING_HEADERS;
212 if (activeRequest->responseCode() == 204) {
213 noMessageBody = true;
214 } else if (activeRequest->method() == "HEAD") {
215 noMessageBody = true;
217 noMessageBody = false;
220 bodyTransferSize = -1;
221 chunkedTransfer = false;
222 _contentDecoder.reset();
225 void tryStartNextRequest()
227 if (queuedRequests.empty()) {
232 if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
236 if (state == STATE_CLOSED) {
237 if (!connectToHost()) {
241 setTerminator("\r\n");
245 Request_ptr r = queuedRequests.front();
247 requestBodyBytesToSend = r->requestBodyLength();
249 stringstream headerData;
250 string path = r->path();
251 assert(!path.empty());
252 string query = r->query();
255 if (!client->proxyHost().empty()) {
256 path = r->scheme() + "://" + r->host() + r->path();
259 if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
260 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
261 bodyData = query.substr(1); // URL-encode, drop the leading '?'
262 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
263 headerData << "Content-Length:" << bodyData.size() << "\r\n";
265 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
266 if (requestBodyBytesToSend >= 0) {
267 headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
268 headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
272 headerData << "Host: " << r->hostAndPort() << "\r\n";
273 headerData << "User-Agent:" << client->userAgent() << "\r\n";
274 headerData << "Accept-Encoding: deflate, gzip\r\n";
275 if (!client->proxyAuth().empty()) {
276 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
279 BOOST_FOREACH(string h, r->requestHeaders()) {
280 headerData << h << ": " << r->header(h) << "\r\n";
283 headerData << "\r\n"; // final CRLF to terminate the headers
284 if (!bodyData.empty()) {
285 headerData << bodyData;
288 bool ok = push(headerData.str().c_str());
290 SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
291 // we've over-stuffed the socket, give up for now, let things
292 // drain down before trying to start any more requests.
296 while (requestBodyBytesToSend > 0) {
298 int len = r->getBodyData(buf, 4096);
300 requestBodyBytesToSend -= len;
301 if (!bufferSend(buf, len)) {
302 SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
303 state = STATE_SOCKET_ERROR;
306 // SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
308 SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
313 // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
314 // "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
315 // "\n\t on connection " << this);
316 // successfully sent, remove from queue, and maybe send the next
317 queuedRequests.pop_front();
318 sentRequests.push_back(r);
319 state = STATE_WAITING_FOR_RESPONSE;
321 // pipelining, let's maybe send the next request right away
322 tryStartNextRequest();
325 virtual void collectIncomingData(const char* s, int n)
328 client->receivedBytes(static_cast<unsigned int>(n));
330 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
331 _contentDecoder.receivedBytes(s, n);
333 buffer += string(s, n);
337 virtual void foundTerminator(void)
341 case STATE_WAITING_FOR_RESPONSE:
345 case STATE_GETTING_HEADERS:
350 case STATE_GETTING_BODY:
354 case STATE_GETTING_CHUNKED:
355 processChunkHeader();
358 case STATE_GETTING_CHUNKED_BYTES:
359 setTerminator("\r\n");
360 state = STATE_GETTING_CHUNKED;
365 case STATE_GETTING_TRAILER:
371 SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
378 bool hasIdleTimeout() const
380 if (state != STATE_IDLE) {
384 assert(sentRequests.empty());
385 return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
388 bool hasErrorTimeout() const
390 if (state == STATE_IDLE) {
394 return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
397 bool hasError() const
399 return (state == STATE_SOCKET_ERROR);
402 bool shouldStartNext() const
404 return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
407 bool isActive() const
409 return !queuedRequests.empty() || !sentRequests.empty();
414 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
417 SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
421 if (connect(host.c_str(), port) != 0) {
431 string h = strutils::simplify(buffer);
432 if (h.empty()) { // blank line terminates headers
437 int colonPos = buffer.find(':');
439 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
443 string key = strutils::simplify(buffer.substr(0, colonPos));
444 string lkey = boost::to_lower_copy(key);
445 string value = strutils::strip(buffer.substr(colonPos + 1));
447 // only consider these if getting headers (as opposed to trailers
448 // of a chunked transfer)
449 if (state == STATE_GETTING_HEADERS) {
450 if (lkey == "content-length") {
452 int sz = strutils::to_int(value);
453 if (bodyTransferSize <= 0) {
454 bodyTransferSize = sz;
456 activeRequest->setResponseLength(sz);
457 } else if (lkey == "transfer-length") {
458 bodyTransferSize = strutils::to_int(value);
459 } else if (lkey == "transfer-encoding") {
460 processTransferEncoding(value);
461 } else if (lkey == "content-encoding") {
462 _contentDecoder.setEncoding(value);
466 activeRequest->responseHeader(lkey, value);
469 void processTransferEncoding(const string& te)
471 if (te == "chunked") {
472 chunkedTransfer = true;
474 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
479 void processChunkHeader()
481 if (buffer.empty()) {
482 // blank line after chunk data
487 int semiPos = buffer.find(';');
489 // extensions ignored for the moment
490 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
492 chunkSize = strutils::to_int(buffer, 16);
496 if (chunkSize == 0) { // trailer start
497 state = STATE_GETTING_TRAILER;
501 state = STATE_GETTING_CHUNKED_BYTES;
502 setByteCount(chunkSize);
505 void processTrailer()
507 if (buffer.empty()) {
513 // process as a normal header
517 void headersComplete()
519 activeRequest->responseHeadersComplete();
520 _contentDecoder.initWithRequest(activeRequest);
522 if (chunkedTransfer) {
523 state = STATE_GETTING_CHUNKED;
524 } else if (noMessageBody || (bodyTransferSize == 0)) {
525 // force the state to GETTING_BODY, to simplify logic in
526 // responseComplete and handleClose
527 state = STATE_GETTING_BODY;
530 setByteCount(bodyTransferSize); // may be -1, that's fine
531 state = STATE_GETTING_BODY;
535 void responseComplete()
537 Request_ptr completedRequest = activeRequest;
538 _contentDecoder.finish();
540 assert(sentRequests.front() == activeRequest);
541 sentRequests.pop_front();
542 bool doClose = activeRequest->closeAfterComplete();
543 activeRequest = NULL;
545 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
547 // this will bring us into handleClose() above, which updates
548 // state to STATE_CLOSED
551 // if we have additional requests waiting, try to start them now
552 tryStartNextRequest();
556 if (state != STATE_CLOSED) {
557 state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
560 // notify request after we change state, so this connection is idle
561 // if completion triggers other requests (which is likely)
562 // SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
563 completedRequest->responseComplete();
564 client->requestFinished(this);
566 setTerminator("\r\n");
569 enum ConnectionState {
571 STATE_WAITING_FOR_RESPONSE,
572 STATE_GETTING_HEADERS,
574 STATE_GETTING_CHUNKED,
575 STATE_GETTING_CHUNKED_BYTES,
576 STATE_GETTING_TRAILER,
578 STATE_CLOSED ///< connection should be closed now
582 Request_ptr activeRequest;
583 ConnectionState state;
587 int bodyTransferSize;
588 SGTimeStamp idleTime;
589 bool chunkedTransfer;
591 int requestBodyBytesToSend;
593 RequestList queuedRequests;
594 RequestList sentRequests;
596 ContentDecoder _contentDecoder;
603 d->maxConnections = 4;
604 d->bytesTransferred = 0;
605 d->lastTransferRate = 0;
606 d->timeTransferSample.stamp();
607 d->totalBytesDownloaded = 0;
609 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
616 void Client::setMaxConnections(unsigned int maxCon)
619 throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
622 d->maxConnections = maxCon;
625 void Client::update(int waitTimeout)
627 d->poller.poll(waitTimeout);
628 bool waitingRequests = !d->pendingRequests.empty();
630 ConnectionDict::iterator it = d->connections.begin();
631 for (; it != d->connections.end(); ) {
632 Connection* con = it->second;
633 if (con->hasIdleTimeout() ||
635 con->hasErrorTimeout() ||
636 (!con->isActive() && waitingRequests))
638 if (con->hasErrorTimeout()) {
639 // tell the connection we're timing it out
640 con->handleTimeout();
643 // connection has been idle for a while, clean it up
644 // (or if we have requests waiting for a different host,
645 // or an error condition
646 ConnectionDict::iterator del = it++;
648 d->connections.erase(del);
650 if (it->second->shouldStartNext()) {
651 it->second->tryStartNextRequest();
655 } // of connection iteration
657 if (waitingRequests && (d->connections.size() < d->maxConnections)) {
658 RequestList waiting(d->pendingRequests);
659 d->pendingRequests.clear();
661 // re-submit all waiting requests in order; this takes care of
662 // finding multiple pending items targetted to the same (new)
664 BOOST_FOREACH(Request_ptr req, waiting) {
670 void Client::makeRequest(const Request_ptr& r)
672 if( r->url().find("://") == std::string::npos ) {
673 r->setFailure(EINVAL, "malformed URL");
677 if( r->url().find("http://") != 0 ) {
678 r->setFailure(EINVAL, "only HTTP protocol is supported");
682 string host = r->host();
683 int port = r->port();
684 if (!d->proxy.empty()) {
689 Connection* con = NULL;
691 ss << host << "-" << port;
692 string connectionId = ss.str();
693 bool havePending = !d->pendingRequests.empty();
694 bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
695 ConnectionDict::iterator consEnd = d->connections.end();
697 // assign request to an existing Connection.
698 // various options exist here, examined in order
699 ConnectionDict::iterator it = d->connections.find(connectionId);
700 if (atConnectionsLimit && (it == consEnd)) {
701 // maximum number of connections active, queue this request
702 // when a connection goes inactive, we'll start this one
703 d->pendingRequests.push_back(r);
707 // scan for an idle Connection to the same host (likely if we're
708 // retrieving multiple resources from the same host in quick succession)
709 // if we have pending requests (waiting for a free Connection), then
710 // force new requests on this id to always use the first Connection
711 // (instead of the random selection below). This ensures that when
712 // there's pressure on the number of connections to keep alive, one
713 // host can't DoS every other.
715 for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
716 if (havePending || !it->second->isActive()) {
722 if (!con && atConnectionsLimit) {
723 // all current connections are busy (active), and we don't
724 // have free connections to allocate, so let's assign to
725 // an existing one randomly. Ideally we'd used whichever one will
726 // complete first but we don't have that info.
727 int index = rand() % count;
728 for (it = d->connections.find(connectionId); index > 0; --index) { ; }
732 // allocate a new connection object
734 con = new Connection(this);
735 con->setServer(host, port);
736 d->poller.addChannel(con);
737 d->connections.insert(d->connections.end(),
738 ConnectionDict::value_type(connectionId, con));
741 con->queueRequest(r);
744 void Client::requestFinished(Connection* con)
749 void Client::setUserAgent(const string& ua)
754 const std::string& Client::userAgent() const
759 const std::string& Client::proxyHost() const
764 const std::string& Client::proxyAuth() const
769 void Client::setProxy(const string& proxy, int port, const string& auth)
776 bool Client::hasActiveRequests() const
778 ConnectionDict::const_iterator it = d->connections.begin();
779 for (; it != d->connections.end(); ++it) {
780 if (it->second->isActive()) return true;
786 void Client::receivedBytes(unsigned int count)
788 d->bytesTransferred += count;
789 d->totalBytesDownloaded += count;
792 unsigned int Client::transferRateBytesPerSec() const
794 unsigned int e = d->timeTransferSample.elapsedMSec();
796 // too long a window, ignore
797 d->timeTransferSample.stamp();
798 d->bytesTransferred = 0;
799 d->lastTransferRate = 0;
803 if (e < 100) { // avoid really narrow windows
804 return d->lastTransferRate;
807 unsigned int ratio = (d->bytesTransferred * 1000) / e;
808 // run a low-pass filter
809 unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
812 d->timeTransferSample.stamp();
813 d->bytesTransferred = 0;
814 d->lastTransferRate = smoothed;
818 uint64_t Client::totalBytesDownloaded() const
820 return d->totalBytesDownloaded;
823 } // of namespace HTTP
825 } // of namespace simgear