2 * \file HTTPClient.cxx - simple HTTP client engine for SimHear
5 // Written by James Turner
7 // Copyright (C) 2013 James Turner <zakalawe@mac.com>
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Library General Public License for more details.
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
24 #include "HTTPClient.hxx"
33 #include <boost/foreach.hpp>
34 #include <boost/algorithm/string/case_conv.hpp>
38 #include <simgear/io/sg_netChat.hxx>
39 #include <simgear/io/lowlevel.hxx>
40 #include <simgear/misc/strutils.hxx>
41 #include <simgear/compiler.h>
42 #include <simgear/debug/logstream.hxx>
43 #include <simgear/timing/timestamp.hxx>
44 #include <simgear/structure/exception.hxx>
46 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
49 # if !defined(SIMGEAR_VERSION)
50 # define SIMGEAR_VERSION "simgear-development"
55 using std::stringstream;
64 extern const int DEFAULT_HTTP_PORT = 80;
65 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
66 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
67 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
68 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
70 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
71 // detailed description of the logic
72 const int GZIP_HEADER_ID1 = 31;
73 const int GZIP_HEADER_ID2 = 139;
74 const int GZIP_HEADER_METHOD_DEFLATE = 8;
75 const unsigned int GZIP_HEADER_SIZE = 10;
76 const int GZIP_HEADER_FEXTRA = 1 << 2;
77 const int GZIP_HEADER_FNAME = 1 << 3;
78 const int GZIP_HEADER_COMMENT = 1 << 4;
79 const int GZIP_HEADER_CRC = 1 << 1;
82 typedef std::multimap<std::string, Connection*> ConnectionDict;
83 typedef std::list<Request_ptr> RequestList;
85 class Client::ClientPrivate
88 std::string userAgent;
91 std::string proxyAuth;
92 NetChannelPoller poller;
93 unsigned int maxConnections;
95 RequestList pendingRequests;
97 // connections by host (potentially more than one)
98 ConnectionDict connections;
101 class Connection : public NetChat
104 Connection(Client* pr) :
107 port(DEFAULT_HTTP_PORT),
108 zlibInflateBuffer(NULL),
109 zlibInflateBufferSize(0),
110 zlibOutputBuffer(NULL)
115 virtual ~Connection()
117 if (zlibInflateBuffer) {
118 free(zlibInflateBuffer);
121 if (zlibOutputBuffer) {
122 free(zlibOutputBuffer);
126 void setServer(const string& h, short p)
132 // socket-level errors
133 virtual void handleError(int error)
135 if (error == ENOENT) {
136 // name lookup failure
137 // we won't have an active request yet, so the logic below won't
138 // fire to actually call setFailure. Let's fail all of the requests
139 BOOST_FOREACH(Request_ptr req, sentRequests) {
140 req->setFailure(error, "hostname lookup failure");
143 BOOST_FOREACH(Request_ptr req, queuedRequests) {
144 req->setFailure(error, "hostname lookup failure");
147 // name lookup failure, abandon all requests on this connection
148 sentRequests.clear();
149 queuedRequests.clear();
152 NetChat::handleError(error);
154 SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
155 activeRequest->setFailure(error, "socket error");
156 activeRequest = NULL;
159 state = STATE_SOCKET_ERROR;
162 virtual void handleClose()
164 NetChat::handleClose();
166 // closing of the connection from the server side when getting the body,
167 bool canCloseState = (state == STATE_GETTING_BODY);
168 if (canCloseState && activeRequest) {
169 // force state here, so responseComplete can avoid closing the
171 state = STATE_CLOSED;
175 activeRequest->setFailure(500, "server closed connection");
176 // remove the failed request from sentRequests, so it does
178 RequestList::iterator it = std::find(sentRequests.begin(),
179 sentRequests.end(), activeRequest);
180 if (it != sentRequests.end()) {
181 sentRequests.erase(it);
183 activeRequest = NULL;
186 state = STATE_CLOSED;
189 if (sentRequests.empty()) {
193 // restore sent requests to the queue, so they will be re-sent
194 // when the connection opens again
195 queuedRequests.insert(queuedRequests.begin(),
196 sentRequests.begin(), sentRequests.end());
197 sentRequests.clear();
200 void queueRequest(const Request_ptr& r)
202 queuedRequests.push_back(r);
203 tryStartNextRequest();
208 assert(!sentRequests.empty());
210 activeRequest = sentRequests.front();
211 activeRequest->responseStart(buffer);
212 state = STATE_GETTING_HEADERS;
214 if (activeRequest->responseCode() == 204) {
215 noMessageBody = true;
216 } else if (activeRequest->method() == "HEAD") {
217 noMessageBody = true;
219 noMessageBody = false;
222 bodyTransferSize = -1;
223 chunkedTransfer = false;
224 contentGZip = contentDeflate = false;
227 void tryStartNextRequest()
229 if (queuedRequests.empty()) {
234 if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
238 if (state == STATE_CLOSED) {
239 if (!connectToHost()) {
243 setTerminator("\r\n");
247 Request_ptr r = queuedRequests.front();
249 requestBodyBytesToSend = r->requestBodyLength();
251 stringstream headerData;
252 string path = r->path();
253 assert(!path.empty());
254 string query = r->query();
257 if (!client->proxyHost().empty()) {
258 path = r->scheme() + "://" + r->host() + r->path();
261 if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
262 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
263 bodyData = query.substr(1); // URL-encode, drop the leading '?'
264 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
265 headerData << "Content-Length:" << bodyData.size() << "\r\n";
267 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
268 if (requestBodyBytesToSend >= 0) {
269 headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
270 headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
274 headerData << "Host: " << r->hostAndPort() << "\r\n";
275 headerData << "User-Agent:" << client->userAgent() << "\r\n";
276 headerData << "Accept-Encoding: deflate, gzip\r\n";
277 if (!client->proxyAuth().empty()) {
278 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
281 BOOST_FOREACH(string h, r->requestHeaders()) {
282 headerData << h << ": " << r->header(h) << "\r\n";
285 headerData << "\r\n"; // final CRLF to terminate the headers
286 if (!bodyData.empty()) {
287 headerData << bodyData;
290 bool ok = push(headerData.str().c_str());
292 SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
293 // we've over-stuffed the socket, give up for now, let things
294 // drain down before trying to start any more requests.
298 while (requestBodyBytesToSend > 0) {
300 int len = r->getBodyData(buf, 4096);
302 requestBodyBytesToSend -= len;
303 if (!bufferSend(buf, len)) {
304 SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
305 state = STATE_SOCKET_ERROR;
308 // SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
310 SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
315 SG_LOG(SG_IO, SG_DEBUG, "did start request:" << r->url() <<
316 "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
317 "\n\t on connection " << this);
318 // successfully sent, remove from queue, and maybe send the next
319 queuedRequests.pop_front();
320 sentRequests.push_back(r);
322 // pipelining, let's maybe send the next request right away
323 tryStartNextRequest();
326 virtual void collectIncomingData(const char* s, int n)
329 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
330 if (contentGZip || contentDeflate) {
331 expandCompressedData(s, n);
333 activeRequest->processBodyBytes(s, n);
336 buffer += string(s, n);
341 void expandCompressedData(const char* s, int n)
343 int reqSize = n + zlib.avail_in;
344 if (reqSize > zlibInflateBufferSize) {
346 unsigned char* newBuf = (unsigned char*) malloc(reqSize);
347 memcpy(newBuf, zlib.next_in, zlib.avail_in);
348 memcpy(newBuf + zlib.avail_in, s, n);
349 free(zlibInflateBuffer);
350 zlibInflateBuffer = newBuf;
351 zlibInflateBufferSize = reqSize;
353 // important to use memmove here, since it's very likely
354 // the source and destination ranges overlap
355 memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
356 memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
359 zlib.next_in = (unsigned char*) zlibInflateBuffer;
360 zlib.avail_in = reqSize;
361 zlib.next_out = zlibOutputBuffer;
362 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
364 if (contentGZip && !handleGZipHeader()) {
370 int result = inflate(&zlib, Z_NO_FLUSH);
371 if (result == Z_OK || result == Z_STREAM_END) {
374 SG_LOG(SG_IO, SG_WARN, "HTTP: got Zlib error:" << result);
378 writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
379 if (result == Z_STREAM_END) {
382 } while ((writtenSize == 0) && (zlib.avail_in > 0));
384 if (writtenSize > 0) {
385 activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
389 bool handleGZipHeader()
391 // we clear this down to contentDeflate once the GZip header has been seen
392 if (zlib.avail_in < GZIP_HEADER_SIZE) {
393 return false; // need more header bytes
396 if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
397 (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
398 (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
400 return false; // invalid GZip header
403 char flags = zlibInflateBuffer[3];
404 unsigned int gzipHeaderSize = GZIP_HEADER_SIZE;
405 if (flags & GZIP_HEADER_FEXTRA) {
407 if (zlib.avail_in < gzipHeaderSize) {
408 return false; // need more header bytes
411 unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
412 if ( sgIsBigEndian() ) {
413 sgEndianSwap( &extraHeaderBytes );
416 gzipHeaderSize += extraHeaderBytes;
417 if (zlib.avail_in < gzipHeaderSize) {
418 return false; // need more header bytes
422 if (flags & GZIP_HEADER_FNAME) {
424 while (gzipHeaderSize <= zlib.avail_in) {
425 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
426 break; // found terminating NULL character
431 if (flags & GZIP_HEADER_COMMENT) {
433 while (gzipHeaderSize <= zlib.avail_in) {
434 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
435 break; // found terminating NULL character
440 if (flags & GZIP_HEADER_CRC) {
444 if (zlib.avail_in < gzipHeaderSize) {
445 return false; // need more header bytes
448 zlib.next_in += gzipHeaderSize;
449 zlib.avail_in -= gzipHeaderSize;
450 // now we've processed the GZip header, can decode as deflate
452 contentDeflate = true;
456 virtual void foundTerminator(void)
464 case STATE_GETTING_HEADERS:
469 case STATE_GETTING_BODY:
473 case STATE_GETTING_CHUNKED:
474 processChunkHeader();
477 case STATE_GETTING_CHUNKED_BYTES:
478 setTerminator("\r\n");
479 state = STATE_GETTING_CHUNKED;
484 case STATE_GETTING_TRAILER:
494 bool hasIdleTimeout() const
496 if (state != STATE_IDLE) {
500 return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
503 bool hasErrorTimeout() const
505 if (state == STATE_IDLE) {
509 return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
512 bool hasError() const
514 return (state == STATE_SOCKET_ERROR);
517 bool shouldStartNext() const
519 return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
522 bool isActive() const
524 return !queuedRequests.empty() || !sentRequests.empty();
529 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
532 SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
536 if (connect(host.c_str(), port) != 0) {
546 string h = strutils::simplify(buffer);
547 if (h.empty()) { // blank line terminates headers
550 if (contentGZip || contentDeflate) {
551 memset(&zlib, 0, sizeof(z_stream));
552 if (!zlibOutputBuffer) {
553 zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
556 // NULLs means we'll get default alloc+free methods
557 // which is absolutely fine
558 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
559 zlib.next_out = zlibOutputBuffer;
560 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
561 SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
565 if (chunkedTransfer) {
566 state = STATE_GETTING_CHUNKED;
567 } else if (noMessageBody || (bodyTransferSize == 0)) {
568 // force the state to GETTING_BODY, to simplify logic in
569 // responseComplete and handleClose
570 state = STATE_GETTING_BODY;
573 setByteCount(bodyTransferSize); // may be -1, that's fine
574 state = STATE_GETTING_BODY;
580 int colonPos = buffer.find(':');
582 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
586 string key = strutils::simplify(buffer.substr(0, colonPos));
587 string lkey = boost::to_lower_copy(key);
588 string value = strutils::strip(buffer.substr(colonPos + 1));
590 // only consider these if getting headers (as opposed to trailers
591 // of a chunked transfer)
592 if (state == STATE_GETTING_HEADERS) {
593 if (lkey == "content-length") {
595 int sz = strutils::to_int(value);
596 if (bodyTransferSize <= 0) {
597 bodyTransferSize = sz;
599 activeRequest->setResponseLength(sz);
600 } else if (lkey == "transfer-length") {
601 bodyTransferSize = strutils::to_int(value);
602 } else if (lkey == "transfer-encoding") {
603 processTransferEncoding(value);
604 } else if (lkey == "content-encoding") {
605 if (value == "gzip") {
607 } else if (value == "deflate") {
608 contentDeflate = true;
609 } else if (value != "identity") {
610 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
615 activeRequest->responseHeader(lkey, value);
618 void processTransferEncoding(const string& te)
620 if (te == "chunked") {
621 chunkedTransfer = true;
623 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
628 void processChunkHeader()
630 if (buffer.empty()) {
631 // blank line after chunk data
636 int semiPos = buffer.find(';');
638 // extensions ignored for the moment
639 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
641 chunkSize = strutils::to_int(buffer, 16);
645 if (chunkSize == 0) { // trailer start
646 state = STATE_GETTING_TRAILER;
650 state = STATE_GETTING_CHUNKED_BYTES;
651 setByteCount(chunkSize);
654 void processTrailer()
656 if (buffer.empty()) {
662 // process as a normal header
666 void headersComplete()
668 activeRequest->responseHeadersComplete();
671 void responseComplete()
673 // SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
674 activeRequest->responseComplete();
675 client->requestFinished(this);
677 if (contentDeflate) {
681 assert(sentRequests.front() == activeRequest);
682 sentRequests.pop_front();
683 bool doClose = activeRequest->closeAfterComplete();
684 activeRequest = NULL;
686 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
688 // this will bring us into handleClose() above, which updates
689 // state to STATE_CLOSED
692 // if we have additional requests waiting, try to start them now
693 tryStartNextRequest();
697 if (state != STATE_CLOSED) {
700 setTerminator("\r\n");
703 enum ConnectionState {
705 STATE_GETTING_HEADERS,
707 STATE_GETTING_CHUNKED,
708 STATE_GETTING_CHUNKED_BYTES,
709 STATE_GETTING_TRAILER,
711 STATE_CLOSED ///< connection should be closed now
715 Request_ptr activeRequest;
716 ConnectionState state;
720 int bodyTransferSize;
721 SGTimeStamp idleTime;
722 bool chunkedTransfer;
724 int requestBodyBytesToSend;
727 unsigned char* zlibInflateBuffer;
728 int zlibInflateBufferSize;
729 unsigned char* zlibOutputBuffer;
730 bool contentGZip, contentDeflate;
732 RequestList queuedRequests;
733 RequestList sentRequests;
740 d->maxConnections = 4;
741 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
748 void Client::setMaxConnections(unsigned int maxCon)
751 throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
754 d->maxConnections = maxCon;
757 void Client::update(int waitTimeout)
759 d->poller.poll(waitTimeout);
760 bool waitingRequests = !d->pendingRequests.empty();
762 ConnectionDict::iterator it = d->connections.begin();
763 for (; it != d->connections.end(); ) {
764 Connection* con = it->second;
765 if (con->hasIdleTimeout() ||
767 con->hasErrorTimeout() ||
768 (!con->isActive() && waitingRequests))
770 // connection has been idle for a while, clean it up
771 // (or if we have requests waiting for a different host,
772 // or an error condition
773 ConnectionDict::iterator del = it++;
775 d->connections.erase(del);
777 if (it->second->shouldStartNext()) {
778 it->second->tryStartNextRequest();
782 } // of connection iteration
784 if (waitingRequests && (d->connections.size() < d->maxConnections)) {
785 RequestList waiting(d->pendingRequests);
786 d->pendingRequests.clear();
788 // re-submit all waiting requests in order; this takes care of
789 // finding multiple pending items targetted to the same (new)
791 BOOST_FOREACH(Request_ptr req, waiting) {
797 void Client::makeRequest(const Request_ptr& r)
799 string host = r->host();
800 int port = r->port();
801 if (!d->proxy.empty()) {
806 Connection* con = NULL;
808 ss << host << "-" << port;
809 string connectionId = ss.str();
810 bool havePending = !d->pendingRequests.empty();
812 // assign request to an existing Connection.
813 // various options exist here, examined in order
814 if (d->connections.size() >= d->maxConnections) {
815 ConnectionDict::iterator it = d->connections.find(connectionId);
816 if (it == d->connections.end()) {
817 // maximum number of connections active, queue this request
818 // when a connection goes inactive, we'll start this one
819 d->pendingRequests.push_back(r);
823 // scan for an idle Connection to the same host (likely if we're
824 // retrieving multiple resources from the same host in quick succession)
825 // if we have pending requests (waiting for a free Connection), then
826 // force new requests on this id to always use the first Connection
827 // (instead of the random selection below). This ensures that when
828 // there's pressure on the number of connections to keep alive, one
829 // host can't DoS every other.
831 for (;it->first == connectionId; ++it, ++count) {
832 if (havePending || !it->second->isActive()) {
839 // we have at least one connection to the host, but they are
840 // all active - we need to pick one to queue the request on.
841 // we use random but round-robin would also work.
842 int index = random() % count;
843 for (it = d->connections.find(connectionId); index > 0; --index) { ; }
846 } // of at max connections limit
848 // allocate a new connection object
850 con = new Connection(this);
851 con->setServer(host, port);
852 d->poller.addChannel(con);
853 d->connections.insert(d->connections.end(),
854 ConnectionDict::value_type(connectionId, con));
857 con->queueRequest(r);
860 void Client::requestFinished(Connection* con)
865 void Client::setUserAgent(const string& ua)
870 const std::string& Client::userAgent() const
875 const std::string& Client::proxyHost() const
880 const std::string& Client::proxyAuth() const
885 void Client::setProxy(const string& proxy, int port, const string& auth)
892 bool Client::hasActiveRequests() const
894 ConnectionDict::const_iterator it = d->connections.begin();
895 for (; it != d->connections.end(); ++it) {
896 if (it->second->isActive()) return true;
902 } // of namespace HTTP
904 } // of namespace simgear