1 #include "HTTPClient.hxx"
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string/case_conv.hpp>
14 #include <simgear/io/sg_netChat.hxx>
15 #include <simgear/io/lowlevel.hxx>
16 #include <simgear/misc/strutils.hxx>
17 #include <simgear/compiler.h>
18 #include <simgear/debug/logstream.hxx>
19 #include <simgear/timing/timestamp.hxx>
21 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
24 # if !defined(SIMGEAR_VERSION)
25 # define SIMGEAR_VERSION "simgear-development"
30 using std::stringstream;
39 extern const int DEFAULT_HTTP_PORT = 80;
40 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
41 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
42 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
43 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
45 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
46 // detailed description of the logic
47 const int GZIP_HEADER_ID1 = 31;
48 const int GZIP_HEADER_ID2 = 139;
49 const int GZIP_HEADER_METHOD_DEFLATE = 8;
50 const unsigned int GZIP_HEADER_SIZE = 10;
51 const int GZIP_HEADER_FEXTRA = 1 << 2;
52 const int GZIP_HEADER_FNAME = 1 << 3;
53 const int GZIP_HEADER_COMMENT = 1 << 4;
54 const int GZIP_HEADER_CRC = 1 << 1;
56 class Connection : public NetChat
59 Connection(Client* pr) :
62 port(DEFAULT_HTTP_PORT),
63 zlibInflateBuffer(NULL),
64 zlibInflateBufferSize(0),
65 zlibOutputBuffer(NULL)
72 if (zlibInflateBuffer) {
73 free(zlibInflateBuffer);
76 if (zlibOutputBuffer) {
77 free(zlibOutputBuffer);
81 void setServer(const string& h, short p)
87 // socket-level errors
88 virtual void handleError(int error)
90 if (error == ENOENT) {
91 // name lookup failure
92 // we won't have an active request yet, so the logic below won't
93 // fire to actually call setFailure. Let's fail all of the requests
94 BOOST_FOREACH(Request_ptr req, sentRequests) {
95 req->setFailure(error, "hostname lookup failure");
98 BOOST_FOREACH(Request_ptr req, queuedRequests) {
99 req->setFailure(error, "hostname lookup failure");
102 // name lookup failure, abandon all requests on this connection
103 sentRequests.clear();
104 queuedRequests.clear();
107 NetChat::handleError(error);
109 SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
110 activeRequest->setFailure(error, "socket error");
111 activeRequest = NULL;
114 state = STATE_SOCKET_ERROR;
117 virtual void handleClose()
119 NetChat::handleClose();
121 if ((state == STATE_GETTING_BODY) && activeRequest) {
122 // force state here, so responseComplete can avoid closing the
124 state = STATE_CLOSED;
127 state = STATE_CLOSED;
130 if (sentRequests.empty()) {
134 // restore sent requests to the queue, so they will be re-sent
135 // when the connection opens again
136 queuedRequests.insert(queuedRequests.begin(),
137 sentRequests.begin(), sentRequests.end());
138 sentRequests.clear();
141 void queueRequest(const Request_ptr& r)
143 queuedRequests.push_back(r);
144 tryStartNextRequest();
149 assert(!sentRequests.empty());
151 activeRequest = sentRequests.front();
152 activeRequest->responseStart(buffer);
153 state = STATE_GETTING_HEADERS;
155 if (activeRequest->responseCode() == 204) {
156 noMessageBody = true;
157 } else if (activeRequest->method() == "HEAD") {
158 noMessageBody = true;
160 noMessageBody = false;
163 bodyTransferSize = -1;
164 chunkedTransfer = false;
165 contentGZip = contentDeflate = false;
168 void tryStartNextRequest()
170 if (queuedRequests.empty()) {
175 if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
179 if (state == STATE_CLOSED) {
180 if (!connectToHost()) {
184 setTerminator("\r\n");
188 Request_ptr r = queuedRequests.front();
189 requestBodyBytesToSend = r->requestBodyLength();
191 stringstream headerData;
192 string path = r->path();
193 assert(!path.empty());
194 string query = r->query();
197 if (!client->proxyHost().empty()) {
198 path = r->scheme() + "://" + r->host() + r->path();
201 if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
202 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
203 bodyData = query.substr(1); // URL-encode, drop the leading '?'
204 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
205 headerData << "Content-Length:" << bodyData.size() << "\r\n";
207 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
208 if (requestBodyBytesToSend >= 0) {
209 headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
210 headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
214 headerData << "Host: " << r->hostAndPort() << "\r\n";
215 headerData << "User-Agent:" << client->userAgent() << "\r\n";
216 headerData << "Accept-Encoding: deflate, gzip\r\n";
217 if (!client->proxyAuth().empty()) {
218 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
221 BOOST_FOREACH(string h, r->requestHeaders()) {
222 headerData << h << ": " << r->header(h) << "\r\n";
225 headerData << "\r\n"; // final CRLF to terminate the headers
226 if (!bodyData.empty()) {
227 headerData << bodyData;
230 bool ok = push(headerData.str().c_str());
232 // we've over-stuffed the socket, give up for now, let things
233 // drain down before trying to start any more requests.
237 while (requestBodyBytesToSend > 0) {
240 r->getBodyData(buf, len);
242 requestBodyBytesToSend -= len;
243 if (!bufferSend(buf, len)) {
244 SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
245 state = STATE_SOCKET_ERROR;
249 SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
254 //std::cout << "did send request:" << r->url() << std::endl;
255 // successfully sent, remove from queue, and maybe send the next
256 queuedRequests.pop_front();
257 sentRequests.push_back(r);
259 // pipelining, let's maybe send the next request right away
260 tryStartNextRequest();
263 virtual void collectIncomingData(const char* s, int n)
266 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
267 if (contentGZip || contentDeflate) {
268 expandCompressedData(s, n);
270 activeRequest->processBodyBytes(s, n);
273 buffer += string(s, n);
278 void expandCompressedData(const char* s, int n)
280 int reqSize = n + zlib.avail_in;
281 if (reqSize > zlibInflateBufferSize) {
283 unsigned char* newBuf = (unsigned char*) malloc(reqSize);
284 memcpy(newBuf, zlib.next_in, zlib.avail_in);
285 memcpy(newBuf + zlib.avail_in, s, n);
286 free(zlibInflateBuffer);
287 zlibInflateBuffer = newBuf;
288 zlibInflateBufferSize = reqSize;
290 // important to use memmove here, since it's very likely
291 // the source and destination ranges overlap
292 memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
293 memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
296 zlib.next_in = (unsigned char*) zlibInflateBuffer;
297 zlib.avail_in = reqSize;
298 zlib.next_out = zlibOutputBuffer;
299 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
301 if (contentGZip && !handleGZipHeader()) {
307 int result = inflate(&zlib, Z_NO_FLUSH);
308 if (result == Z_OK || result == Z_STREAM_END) {
311 SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
315 writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
316 if (result == Z_STREAM_END) {
319 } while ((writtenSize == 0) && (zlib.avail_in > 0));
321 if (writtenSize > 0) {
322 activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
326 bool handleGZipHeader()
328 // we clear this down to contentDeflate once the GZip header has been seen
329 if (zlib.avail_in < GZIP_HEADER_SIZE) {
330 return false; // need more header bytes
333 if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
334 (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
335 (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
337 return false; // invalid GZip header
340 char flags = zlibInflateBuffer[3];
341 int gzipHeaderSize = GZIP_HEADER_SIZE;
342 if (flags & GZIP_HEADER_FEXTRA) {
344 if (zlib.avail_in < gzipHeaderSize) {
345 return false; // need more header bytes
348 unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
349 if ( sgIsBigEndian() ) {
350 sgEndianSwap( &extraHeaderBytes );
353 gzipHeaderSize += extraHeaderBytes;
354 if (zlib.avail_in < gzipHeaderSize) {
355 return false; // need more header bytes
359 if (flags & GZIP_HEADER_FNAME) {
361 while (gzipHeaderSize <= zlib.avail_in) {
362 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
363 break; // found terminating NULL character
368 if (flags & GZIP_HEADER_COMMENT) {
370 while (gzipHeaderSize <= zlib.avail_in) {
371 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
372 break; // found terminating NULL character
377 if (flags & GZIP_HEADER_CRC) {
381 if (zlib.avail_in < gzipHeaderSize) {
382 return false; // need more header bytes
385 zlib.next_in += gzipHeaderSize;
386 zlib.avail_in -= gzipHeaderSize;
387 // now we've processed the GZip header, can decode as deflate
389 contentDeflate = true;
393 virtual void foundTerminator(void)
401 case STATE_GETTING_HEADERS:
406 case STATE_GETTING_BODY:
410 case STATE_GETTING_CHUNKED:
411 processChunkHeader();
414 case STATE_GETTING_CHUNKED_BYTES:
415 setTerminator("\r\n");
416 state = STATE_GETTING_CHUNKED;
421 case STATE_GETTING_TRAILER:
431 bool hasIdleTimeout() const
433 if (state != STATE_IDLE) {
437 return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
440 bool hasErrorTimeout() const
442 if (state == STATE_IDLE) {
446 return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
449 bool hasError() const
451 return (state == STATE_SOCKET_ERROR);
454 bool shouldStartNext() const
456 return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
461 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
464 SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
468 if (connect(host.c_str(), port) != 0) {
478 string h = strutils::simplify(buffer);
479 if (h.empty()) { // blank line terminates headers
482 if (contentGZip || contentDeflate) {
483 memset(&zlib, 0, sizeof(z_stream));
484 if (!zlibOutputBuffer) {
485 zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
488 // NULLs means we'll get default alloc+free methods
489 // which is absolutely fine
490 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
491 zlib.next_out = zlibOutputBuffer;
492 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
493 SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
497 if (chunkedTransfer) {
498 state = STATE_GETTING_CHUNKED;
499 } else if (noMessageBody || (bodyTransferSize == 0)) {
500 // force the state to GETTING_BODY, to simplify logic in
501 // responseComplete and handleClose
502 state = STATE_GETTING_BODY;
505 setByteCount(bodyTransferSize); // may be -1, that's fine
506 state = STATE_GETTING_BODY;
512 int colonPos = buffer.find(':');
514 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
518 string key = strutils::simplify(buffer.substr(0, colonPos));
519 string lkey = boost::to_lower_copy(key);
520 string value = strutils::strip(buffer.substr(colonPos + 1));
522 // only consider these if getting headers (as opposed to trailers
523 // of a chunked transfer)
524 if (state == STATE_GETTING_HEADERS) {
525 if (lkey == "content-length") {
527 int sz = strutils::to_int(value);
528 if (bodyTransferSize <= 0) {
529 bodyTransferSize = sz;
531 activeRequest->setResponseLength(sz);
532 } else if (lkey == "transfer-length") {
533 bodyTransferSize = strutils::to_int(value);
534 } else if (lkey == "transfer-encoding") {
535 processTransferEncoding(value);
536 } else if (lkey == "content-encoding") {
537 if (value == "gzip") {
539 } else if (value == "deflate") {
540 contentDeflate = true;
541 } else if (value != "identity") {
542 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
547 activeRequest->responseHeader(lkey, value);
550 void processTransferEncoding(const string& te)
552 if (te == "chunked") {
553 chunkedTransfer = true;
555 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
560 void processChunkHeader()
562 if (buffer.empty()) {
563 // blank line after chunk data
568 int semiPos = buffer.find(';');
570 // extensions ignored for the moment
571 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
573 chunkSize = strutils::to_int(buffer, 16);
577 if (chunkSize == 0) { // trailer start
578 state = STATE_GETTING_TRAILER;
582 state = STATE_GETTING_CHUNKED_BYTES;
583 setByteCount(chunkSize);
586 void processTrailer()
588 if (buffer.empty()) {
594 // process as a normal header
598 void headersComplete()
600 activeRequest->responseHeadersComplete();
603 void responseComplete()
605 //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
606 activeRequest->responseComplete();
607 client->requestFinished(this);
609 if (contentDeflate) {
613 assert(sentRequests.front() == activeRequest);
614 sentRequests.pop_front();
615 bool doClose = activeRequest->closeAfterComplete();
616 activeRequest = NULL;
618 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
620 // this will bring us into handleClose() above, which updates
621 // state to STATE_CLOSED
624 // if we have additional requests waiting, try to start them now
625 tryStartNextRequest();
629 if (state != STATE_CLOSED) {
632 setTerminator("\r\n");
635 enum ConnectionState {
637 STATE_GETTING_HEADERS,
639 STATE_GETTING_CHUNKED,
640 STATE_GETTING_CHUNKED_BYTES,
641 STATE_GETTING_TRAILER,
643 STATE_CLOSED ///< connection should be closed now
647 Request_ptr activeRequest;
648 ConnectionState state;
652 int bodyTransferSize;
653 SGTimeStamp idleTime;
654 bool chunkedTransfer;
656 int requestBodyBytesToSend;
659 unsigned char* zlibInflateBuffer;
660 int zlibInflateBufferSize;
661 unsigned char* zlibOutputBuffer;
662 bool contentGZip, contentDeflate;
664 std::list<Request_ptr> queuedRequests;
665 std::list<Request_ptr> sentRequests;
670 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
673 void Client::update(int waitTimeout)
675 NetChannel::poll(waitTimeout);
677 ConnectionDict::iterator it = _connections.begin();
678 for (; it != _connections.end(); ) {
679 if (it->second->hasIdleTimeout() || it->second->hasError() ||
680 it->second->hasErrorTimeout())
682 // connection has been idle for a while, clean it up
683 // (or has an error condition, again clean it up)
684 ConnectionDict::iterator del = it++;
686 _connections.erase(del);
688 if (it->second->shouldStartNext()) {
689 it->second->tryStartNextRequest();
694 } // of connecion iteration
697 void Client::makeRequest(const Request_ptr& r)
699 string host = r->host();
700 int port = r->port();
701 if (!_proxy.empty()) {
707 ss << host << "-" << port;
708 string connectionId = ss.str();
710 if (_connections.find(connectionId) == _connections.end()) {
711 Connection* con = new Connection(this);
712 con->setServer(host, port);
713 _connections[connectionId] = con;
716 _connections[connectionId]->queueRequest(r);
719 void Client::requestFinished(Connection* con)
724 void Client::setUserAgent(const string& ua)
729 void Client::setProxy(const string& proxy, int port, const string& auth)
736 } // of namespace HTTP
738 } // of namespace simgear