2 * \file HTTPClient.cxx - simple HTTP client engine for SimHear
5 // Written by James Turner
7 // Copyright (C) 2013 James Turner <zakalawe@mac.com>
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Library General Public License for more details.
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
24 #include "HTTPClient.hxx"
28 #include <cstdlib> // rand()
34 #include <boost/foreach.hpp>
35 #include <boost/algorithm/string/case_conv.hpp>
39 #include <simgear/io/sg_netChat.hxx>
40 #include <simgear/io/lowlevel.hxx>
41 #include <simgear/misc/strutils.hxx>
42 #include <simgear/compiler.h>
43 #include <simgear/debug/logstream.hxx>
44 #include <simgear/timing/timestamp.hxx>
45 #include <simgear/structure/exception.hxx>
47 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
50 # if !defined(SIMGEAR_VERSION)
51 # define SIMGEAR_VERSION "simgear-development"
56 using std::stringstream;
65 extern const int DEFAULT_HTTP_PORT = 80;
66 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
67 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
68 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
69 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
71 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
72 // detailed description of the logic
73 const int GZIP_HEADER_ID1 = 31;
74 const int GZIP_HEADER_ID2 = 139;
75 const int GZIP_HEADER_METHOD_DEFLATE = 8;
76 const unsigned int GZIP_HEADER_SIZE = 10;
77 const int GZIP_HEADER_FEXTRA = 1 << 2;
78 const int GZIP_HEADER_FNAME = 1 << 3;
79 const int GZIP_HEADER_COMMENT = 1 << 4;
80 const int GZIP_HEADER_CRC = 1 << 1;
83 typedef std::multimap<std::string, Connection*> ConnectionDict;
84 typedef std::list<Request_ptr> RequestList;
86 class Client::ClientPrivate
89 std::string userAgent;
92 std::string proxyAuth;
93 NetChannelPoller poller;
94 unsigned int maxConnections;
96 RequestList pendingRequests;
98 // connections by host (potentially more than one)
99 ConnectionDict connections;
101 SGTimeStamp timeTransferSample;
102 unsigned int bytesTransferred;
103 unsigned int lastTransferRate;
106 class Connection : public NetChat
109 Connection(Client* pr) :
112 port(DEFAULT_HTTP_PORT),
113 zlibInflateBuffer(NULL),
114 zlibInflateBufferSize(0),
115 zlibOutputBuffer(NULL)
120 virtual ~Connection()
122 if (zlibInflateBuffer) {
123 free(zlibInflateBuffer);
126 if (zlibOutputBuffer) {
127 free(zlibOutputBuffer);
131 void setServer(const string& h, short p)
137 // socket-level errors
138 virtual void handleError(int error)
140 if (error == ENOENT) {
141 // name lookup failure
142 // we won't have an active request yet, so the logic below won't
143 // fire to actually call setFailure. Let's fail all of the requests
144 BOOST_FOREACH(Request_ptr req, sentRequests) {
145 req->setFailure(error, "hostname lookup failure");
148 BOOST_FOREACH(Request_ptr req, queuedRequests) {
149 req->setFailure(error, "hostname lookup failure");
152 // name lookup failure, abandon all requests on this connection
153 sentRequests.clear();
154 queuedRequests.clear();
157 NetChat::handleError(error);
159 SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
160 activeRequest->setFailure(error, "socket error");
161 activeRequest = NULL;
164 state = STATE_SOCKET_ERROR;
167 virtual void handleClose()
169 NetChat::handleClose();
171 // closing of the connection from the server side when getting the body,
172 bool canCloseState = (state == STATE_GETTING_BODY);
173 if (canCloseState && activeRequest) {
174 // force state here, so responseComplete can avoid closing the
176 state = STATE_CLOSED;
180 activeRequest->setFailure(500, "server closed connection");
181 // remove the failed request from sentRequests, so it does
183 RequestList::iterator it = std::find(sentRequests.begin(),
184 sentRequests.end(), activeRequest);
185 if (it != sentRequests.end()) {
186 sentRequests.erase(it);
188 activeRequest = NULL;
191 state = STATE_CLOSED;
194 if (sentRequests.empty()) {
198 // restore sent requests to the queue, so they will be re-sent
199 // when the connection opens again
200 queuedRequests.insert(queuedRequests.begin(),
201 sentRequests.begin(), sentRequests.end());
202 sentRequests.clear();
207 NetChat::handleError(ETIMEDOUT);
209 SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
210 activeRequest->setFailure(ETIMEDOUT, "socket timeout");
211 activeRequest = NULL;
214 state = STATE_SOCKET_ERROR;
217 void queueRequest(const Request_ptr& r)
219 queuedRequests.push_back(r);
220 tryStartNextRequest();
225 assert(!sentRequests.empty());
226 assert(state == STATE_WAITING_FOR_RESPONSE);
228 activeRequest = sentRequests.front();
230 activeRequest->responseStart(buffer);
231 state = STATE_GETTING_HEADERS;
233 if (activeRequest->responseCode() == 204) {
234 noMessageBody = true;
235 } else if (activeRequest->method() == "HEAD") {
236 noMessageBody = true;
238 noMessageBody = false;
241 bodyTransferSize = -1;
242 chunkedTransfer = false;
243 contentGZip = contentDeflate = false;
246 void tryStartNextRequest()
248 if (queuedRequests.empty()) {
253 if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
257 if (state == STATE_CLOSED) {
258 if (!connectToHost()) {
262 setTerminator("\r\n");
266 Request_ptr r = queuedRequests.front();
268 requestBodyBytesToSend = r->requestBodyLength();
270 stringstream headerData;
271 string path = r->path();
272 assert(!path.empty());
273 string query = r->query();
276 if (!client->proxyHost().empty()) {
277 path = r->scheme() + "://" + r->host() + r->path();
280 if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
281 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
282 bodyData = query.substr(1); // URL-encode, drop the leading '?'
283 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
284 headerData << "Content-Length:" << bodyData.size() << "\r\n";
286 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
287 if (requestBodyBytesToSend >= 0) {
288 headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
289 headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
293 headerData << "Host: " << r->hostAndPort() << "\r\n";
294 headerData << "User-Agent:" << client->userAgent() << "\r\n";
295 headerData << "Accept-Encoding: deflate, gzip\r\n";
296 if (!client->proxyAuth().empty()) {
297 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
300 BOOST_FOREACH(string h, r->requestHeaders()) {
301 headerData << h << ": " << r->header(h) << "\r\n";
304 headerData << "\r\n"; // final CRLF to terminate the headers
305 if (!bodyData.empty()) {
306 headerData << bodyData;
309 bool ok = push(headerData.str().c_str());
311 SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
312 // we've over-stuffed the socket, give up for now, let things
313 // drain down before trying to start any more requests.
317 while (requestBodyBytesToSend > 0) {
319 int len = r->getBodyData(buf, 4096);
321 requestBodyBytesToSend -= len;
322 if (!bufferSend(buf, len)) {
323 SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
324 state = STATE_SOCKET_ERROR;
327 // SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
329 SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
334 // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
335 // "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
336 // "\n\t on connection " << this);
337 // successfully sent, remove from queue, and maybe send the next
338 queuedRequests.pop_front();
339 sentRequests.push_back(r);
340 state = STATE_WAITING_FOR_RESPONSE;
342 // pipelining, let's maybe send the next request right away
343 tryStartNextRequest();
346 virtual void collectIncomingData(const char* s, int n)
349 client->receivedBytes(static_cast<unsigned int>(n));
351 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
352 if (contentGZip || contentDeflate) {
353 expandCompressedData(s, n);
355 activeRequest->processBodyBytes(s, n);
358 buffer += string(s, n);
363 void expandCompressedData(const char* s, int n)
365 int reqSize = n + zlib.avail_in;
366 if (reqSize > zlibInflateBufferSize) {
368 unsigned char* newBuf = (unsigned char*) malloc(reqSize);
369 memcpy(newBuf, zlib.next_in, zlib.avail_in);
370 memcpy(newBuf + zlib.avail_in, s, n);
371 free(zlibInflateBuffer);
372 zlibInflateBuffer = newBuf;
373 zlibInflateBufferSize = reqSize;
375 // important to use memmove here, since it's very likely
376 // the source and destination ranges overlap
377 memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
378 memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
381 zlib.next_in = (unsigned char*) zlibInflateBuffer;
382 zlib.avail_in = reqSize;
383 zlib.next_out = zlibOutputBuffer;
384 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
386 if (contentGZip && !handleGZipHeader()) {
392 int result = inflate(&zlib, Z_NO_FLUSH);
393 if (result == Z_OK || result == Z_STREAM_END) {
396 SG_LOG(SG_IO, SG_WARN, "HTTP: got Zlib error:" << result);
400 writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
401 if (result == Z_STREAM_END) {
404 } while ((writtenSize == 0) && (zlib.avail_in > 0));
406 if (writtenSize > 0) {
407 activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
411 bool handleGZipHeader()
413 // we clear this down to contentDeflate once the GZip header has been seen
414 if (zlib.avail_in < GZIP_HEADER_SIZE) {
415 return false; // need more header bytes
418 if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
419 (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
420 (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
422 return false; // invalid GZip header
425 char flags = zlibInflateBuffer[3];
426 unsigned int gzipHeaderSize = GZIP_HEADER_SIZE;
427 if (flags & GZIP_HEADER_FEXTRA) {
429 if (zlib.avail_in < gzipHeaderSize) {
430 return false; // need more header bytes
433 unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
434 if ( sgIsBigEndian() ) {
435 sgEndianSwap( &extraHeaderBytes );
438 gzipHeaderSize += extraHeaderBytes;
439 if (zlib.avail_in < gzipHeaderSize) {
440 return false; // need more header bytes
444 if (flags & GZIP_HEADER_FNAME) {
446 while (gzipHeaderSize <= zlib.avail_in) {
447 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
448 break; // found terminating NULL character
453 if (flags & GZIP_HEADER_COMMENT) {
455 while (gzipHeaderSize <= zlib.avail_in) {
456 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
457 break; // found terminating NULL character
462 if (flags & GZIP_HEADER_CRC) {
466 if (zlib.avail_in < gzipHeaderSize) {
467 return false; // need more header bytes
470 zlib.next_in += gzipHeaderSize;
471 zlib.avail_in -= gzipHeaderSize;
472 // now we've processed the GZip header, can decode as deflate
474 contentDeflate = true;
478 virtual void foundTerminator(void)
482 case STATE_WAITING_FOR_RESPONSE:
486 case STATE_GETTING_HEADERS:
491 case STATE_GETTING_BODY:
495 case STATE_GETTING_CHUNKED:
496 processChunkHeader();
499 case STATE_GETTING_CHUNKED_BYTES:
500 setTerminator("\r\n");
501 state = STATE_GETTING_CHUNKED;
506 case STATE_GETTING_TRAILER:
512 SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
519 bool hasIdleTimeout() const
521 if (state != STATE_IDLE) {
525 assert(sentRequests.empty());
526 return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
529 bool hasErrorTimeout() const
531 if (state == STATE_IDLE) {
535 return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
538 bool hasError() const
540 return (state == STATE_SOCKET_ERROR);
543 bool shouldStartNext() const
545 return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
548 bool isActive() const
550 return !queuedRequests.empty() || !sentRequests.empty();
555 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
558 SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
562 if (connect(host.c_str(), port) != 0) {
572 string h = strutils::simplify(buffer);
573 if (h.empty()) { // blank line terminates headers
576 if (contentGZip || contentDeflate) {
577 memset(&zlib, 0, sizeof(z_stream));
578 if (!zlibOutputBuffer) {
579 zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
582 // NULLs means we'll get default alloc+free methods
583 // which is absolutely fine
584 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
585 zlib.next_out = zlibOutputBuffer;
586 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
587 SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
591 if (chunkedTransfer) {
592 state = STATE_GETTING_CHUNKED;
593 } else if (noMessageBody || (bodyTransferSize == 0)) {
594 // force the state to GETTING_BODY, to simplify logic in
595 // responseComplete and handleClose
596 state = STATE_GETTING_BODY;
599 setByteCount(bodyTransferSize); // may be -1, that's fine
600 state = STATE_GETTING_BODY;
606 int colonPos = buffer.find(':');
608 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
612 string key = strutils::simplify(buffer.substr(0, colonPos));
613 string lkey = boost::to_lower_copy(key);
614 string value = strutils::strip(buffer.substr(colonPos + 1));
616 // only consider these if getting headers (as opposed to trailers
617 // of a chunked transfer)
618 if (state == STATE_GETTING_HEADERS) {
619 if (lkey == "content-length") {
621 int sz = strutils::to_int(value);
622 if (bodyTransferSize <= 0) {
623 bodyTransferSize = sz;
625 activeRequest->setResponseLength(sz);
626 } else if (lkey == "transfer-length") {
627 bodyTransferSize = strutils::to_int(value);
628 } else if (lkey == "transfer-encoding") {
629 processTransferEncoding(value);
630 } else if (lkey == "content-encoding") {
631 if (value == "gzip") {
633 } else if (value == "deflate") {
634 contentDeflate = true;
635 } else if (value != "identity") {
636 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
641 activeRequest->responseHeader(lkey, value);
644 void processTransferEncoding(const string& te)
646 if (te == "chunked") {
647 chunkedTransfer = true;
649 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
654 void processChunkHeader()
656 if (buffer.empty()) {
657 // blank line after chunk data
662 int semiPos = buffer.find(';');
664 // extensions ignored for the moment
665 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
667 chunkSize = strutils::to_int(buffer, 16);
671 if (chunkSize == 0) { // trailer start
672 state = STATE_GETTING_TRAILER;
676 state = STATE_GETTING_CHUNKED_BYTES;
677 setByteCount(chunkSize);
680 void processTrailer()
682 if (buffer.empty()) {
688 // process as a normal header
692 void headersComplete()
694 activeRequest->responseHeadersComplete();
697 void responseComplete()
699 // SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
700 activeRequest->responseComplete();
701 client->requestFinished(this);
703 if (contentDeflate) {
707 assert(sentRequests.front() == activeRequest);
708 sentRequests.pop_front();
709 bool doClose = activeRequest->closeAfterComplete();
710 activeRequest = NULL;
712 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
714 // this will bring us into handleClose() above, which updates
715 // state to STATE_CLOSED
718 // if we have additional requests waiting, try to start them now
719 tryStartNextRequest();
723 if (state != STATE_CLOSED) {
724 state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
726 setTerminator("\r\n");
729 enum ConnectionState {
731 STATE_WAITING_FOR_RESPONSE,
732 STATE_GETTING_HEADERS,
734 STATE_GETTING_CHUNKED,
735 STATE_GETTING_CHUNKED_BYTES,
736 STATE_GETTING_TRAILER,
738 STATE_CLOSED ///< connection should be closed now
742 Request_ptr activeRequest;
743 ConnectionState state;
747 int bodyTransferSize;
748 SGTimeStamp idleTime;
749 bool chunkedTransfer;
751 int requestBodyBytesToSend;
754 unsigned char* zlibInflateBuffer;
755 int zlibInflateBufferSize;
756 unsigned char* zlibOutputBuffer;
757 bool contentGZip, contentDeflate;
759 RequestList queuedRequests;
760 RequestList sentRequests;
767 d->maxConnections = 4;
768 d->bytesTransferred = 0;
769 d->lastTransferRate = 0;
770 d->timeTransferSample.stamp();
772 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
779 void Client::setMaxConnections(unsigned int maxCon)
782 throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
785 d->maxConnections = maxCon;
788 void Client::update(int waitTimeout)
790 d->poller.poll(waitTimeout);
791 bool waitingRequests = !d->pendingRequests.empty();
793 ConnectionDict::iterator it = d->connections.begin();
794 for (; it != d->connections.end(); ) {
795 Connection* con = it->second;
796 if (con->hasIdleTimeout() ||
798 con->hasErrorTimeout() ||
799 (!con->isActive() && waitingRequests))
801 if (con->hasErrorTimeout()) {
802 // tell the connection we're timing it out
803 con->handleTimeout();
806 // connection has been idle for a while, clean it up
807 // (or if we have requests waiting for a different host,
808 // or an error condition
809 ConnectionDict::iterator del = it++;
811 d->connections.erase(del);
813 if (it->second->shouldStartNext()) {
814 it->second->tryStartNextRequest();
818 } // of connection iteration
820 if (waitingRequests && (d->connections.size() < d->maxConnections)) {
821 RequestList waiting(d->pendingRequests);
822 d->pendingRequests.clear();
824 // re-submit all waiting requests in order; this takes care of
825 // finding multiple pending items targetted to the same (new)
827 BOOST_FOREACH(Request_ptr req, waiting) {
833 void Client::makeRequest(const Request_ptr& r)
835 string host = r->host();
836 int port = r->port();
837 if (!d->proxy.empty()) {
842 Connection* con = NULL;
844 ss << host << "-" << port;
845 string connectionId = ss.str();
846 bool havePending = !d->pendingRequests.empty();
847 ConnectionDict::iterator consEnd = d->connections.end();
849 // assign request to an existing Connection.
850 // various options exist here, examined in order
851 if (d->connections.size() >= d->maxConnections) {
852 ConnectionDict::iterator it = d->connections.find(connectionId);
854 // maximum number of connections active, queue this request
855 // when a connection goes inactive, we'll start this one
856 d->pendingRequests.push_back(r);
860 // scan for an idle Connection to the same host (likely if we're
861 // retrieving multiple resources from the same host in quick succession)
862 // if we have pending requests (waiting for a free Connection), then
863 // force new requests on this id to always use the first Connection
864 // (instead of the random selection below). This ensures that when
865 // there's pressure on the number of connections to keep alive, one
866 // host can't DoS every other.
868 for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
869 if (havePending || !it->second->isActive()) {
876 // we have at least one connection to the host, but they are
877 // all active - we need to pick one to queue the request on.
878 // we use random but round-robin would also work.
879 int index = rand() % count;
880 for (it = d->connections.find(connectionId); index > 0; --index) { ; }
883 } // of at max connections limit
885 // allocate a new connection object
887 con = new Connection(this);
888 con->setServer(host, port);
889 d->poller.addChannel(con);
890 d->connections.insert(d->connections.end(),
891 ConnectionDict::value_type(connectionId, con));
894 con->queueRequest(r);
897 void Client::requestFinished(Connection* con)
902 void Client::setUserAgent(const string& ua)
907 const std::string& Client::userAgent() const
912 const std::string& Client::proxyHost() const
917 const std::string& Client::proxyAuth() const
922 void Client::setProxy(const string& proxy, int port, const string& auth)
929 bool Client::hasActiveRequests() const
931 ConnectionDict::const_iterator it = d->connections.begin();
932 for (; it != d->connections.end(); ++it) {
933 if (it->second->isActive()) return true;
939 void Client::receivedBytes(unsigned int count)
941 d->bytesTransferred += count;
944 unsigned int Client::transferRateBytesPerSec() const
946 unsigned int e = d->timeTransferSample.elapsedMSec();
948 // if called too frequently, return cahced value, to smooth out
949 // < 1 sec changes in flow
950 return d->lastTransferRate;
953 unsigned int ratio = (d->bytesTransferred * 1000) / e;
954 d->timeTransferSample.stamp();
955 d->bytesTransferred = 0;
956 d->lastTransferRate = ratio;
960 } // of namespace HTTP
962 } // of namespace simgear