From 71be1a9e2820cabb91aa3acbdf9801459618b126 Mon Sep 17 00:00:00 2001 From: James Turner Date: Wed, 15 Aug 2012 11:42:12 +0100 Subject: [PATCH] HTTP enhancements. Support content-encoding and improve pipelining support. --- simgear/io/HTTPClient.cxx | 442 ++++++++++++++++++++++++++++--------- simgear/io/HTTPClient.hxx | 2 +- simgear/io/HTTPRequest.cxx | 16 ++ simgear/io/HTTPRequest.hxx | 18 ++ simgear/io/test_HTTP.cxx | 68 +++++- 5 files changed, 437 insertions(+), 109 deletions(-) diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index cf19589e..99fb67b3 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -3,11 +3,15 @@ #include #include #include +#include #include #include +#include + #include +#include #include #include #include @@ -33,18 +37,46 @@ namespace HTTP extern const int DEFAULT_HTTP_PORT = 80; const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded"; - +const int MAX_INFLIGHT_REQUESTS = 32; +const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024; +const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS; + +// see http://www.ietf.org/rfc/rfc1952.txt for these values and +// detailed description of the logic +const int GZIP_HEADER_ID1 = 31; +const int GZIP_HEADER_ID2 = 139; +const int GZIP_HEADER_METHOD_DEFLATE = 8; +const int GZIP_HEADER_SIZE = 10; +const int GZIP_HEADER_FEXTRA = 1 << 2; +const int GZIP_HEADER_FNAME = 1 << 3; +const int GZIP_HEADER_COMMENT = 1 << 4; +const int GZIP_HEADER_CRC = 1 << 1; + class Connection : public NetChat { public: Connection(Client* pr) : client(pr), state(STATE_CLOSED), - port(DEFAULT_HTTP_PORT) + port(DEFAULT_HTTP_PORT), + zlibInflateBuffer(NULL), + zlibInflateBufferSize(0), + zlibOutputBuffer(NULL) { } + virtual ~Connection() + { + if (zlibInflateBuffer) { + free(zlibInflateBuffer); + } + + if (zlibOutputBuffer) { + free(zlibOutputBuffer); + } + } + void setServer(const string& h, short p) { host = h; @@ -65,7 +97,7 @@ public: } virtual void handleClose() - { + { NetChat::handleClose(); if ((state == STATE_GETTING_BODY) && activeRequest) { @@ -76,95 +108,266 @@ public: } else { state = STATE_CLOSED; } + + if (sentRequests.empty()) { + return; + } + + // restore sent requests to the queue, so they will be re-sent + // when the connection opens again + queuedRequests.insert(queuedRequests.begin(), + sentRequests.begin(), sentRequests.end()); + sentRequests.clear(); } void queueRequest(const Request_ptr& r) { - if (!activeRequest) { - startRequest(r); + queuedRequests.push_back(r); + tryStartNextRequest(); + } + + void beginResponse() + { + assert(!sentRequests.empty()); + + activeRequest = sentRequests.front(); + activeRequest->responseStart(buffer); + state = STATE_GETTING_HEADERS; + buffer.clear(); + if (activeRequest->responseCode() == 204) { + noMessageBody = true; + } else if (activeRequest->method() == "HEAD") { + noMessageBody = true; + } else { + noMessageBody = false; + } + + bodyTransferSize = -1; + chunkedTransfer = false; + contentGZip = contentDeflate = false; + } + + void tryStartNextRequest() + { + if (queuedRequests.empty()) { + idleTime.stamp(); + return; + } + + if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) { + return; + } + + if (state == STATE_CLOSED) { + if (!connectToHost()) { + return; + } + + setTerminator("\r\n"); + state = STATE_IDLE; + } + + Request_ptr r = queuedRequests.front(); + requestBodyBytesToSend = r->requestBodyLength(); + + stringstream headerData; + string path = r->path(); + string query = r->query(); + string bodyData; + + if (!client->proxyHost().empty()) { + path = r->scheme() + "://" + r->host() + r->path(); + } + + if (r->method() == "POST") { + headerData << r->method() << " " << path << " HTTP/1.1\r\n"; + bodyData = query.substr(1); // URL-encode, drop the leading '?' + headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n"; + headerData << "Content-Length:" << bodyData.size() << "\r\n"; + } else { + headerData << r->method() << " " << path << query << " HTTP/1.1\r\n"; + if (requestBodyBytesToSend >= 0) { + headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n"; + headerData << "Content-Type:" << r->requestBodyType() << "\r\n"; + } + } + + headerData << "Host: " << r->hostAndPort() << "\r\n"; + headerData << "User-Agent:" << client->userAgent() << "\r\n"; + headerData << "Accept-Encoding: deflate, gzip\r\n"; + if (!client->proxyAuth().empty()) { + headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; + } + + BOOST_FOREACH(string h, r->requestHeaders()) { + headerData << h << ": " << r->header(h) << "\r\n"; + } + + headerData << "\r\n"; // final CRLF to terminate the headers + if (!bodyData.empty()) { + headerData << bodyData; + } + + bool ok = push(headerData.str().c_str()); + if (!ok) { + // we've over-stuffed the socket, give up for now, let things + // drain down before trying to start any more requests. + return; + } + + while (requestBodyBytesToSend > 0) { + char buf[4096]; + int len = 4096; + r->getBodyData(buf, len); + if (len > 0) { + requestBodyBytesToSend -= len; + if (!bufferSend(buf, len)) { + SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer"); + state = STATE_SOCKET_ERROR; + return; + } } else { - queuedRequests.push_back(r); + SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported"); + break; } + } + + //std::cout << "did send request:" << r->url() << std::endl; + // successfully sent, remove from queue, and maybe send the next + queuedRequests.pop_front(); + sentRequests.push_back(r); + + // pipelining, let's maybe send the next request right away + tryStartNextRequest(); } - void startRequest(const Request_ptr& r) + virtual void collectIncomingData(const char* s, int n) { - if (state == STATE_CLOSED) { - if (!connectToHost()) { - return; - } + idleTime.stamp(); + if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) { + if (contentGZip || contentDeflate) { + expandCompressedData(s, n); + } else { + activeRequest->processBodyBytes(s, n); + } + } else { + buffer += string(s, n); + } + } + + + void expandCompressedData(const char* s, int n) + { + int reqSize = n + zlib.avail_in; + if (reqSize > zlibInflateBufferSize) { + // reallocate + unsigned char* newBuf = (unsigned char*) malloc(reqSize); + memcpy(newBuf, zlib.next_in, zlib.avail_in); + memcpy(newBuf + zlib.avail_in, s, n); + free(zlibInflateBuffer); + zlibInflateBuffer = newBuf; + zlibInflateBufferSize = reqSize; + } else { + // important to use memmove here, since it's very likely + // the source and destination ranges overlap + memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in); + memcpy(zlibInflateBuffer + zlib.avail_in, s, n); + } - state = STATE_IDLE; + zlib.next_in = (unsigned char*) zlibInflateBuffer; + zlib.avail_in = reqSize; + zlib.next_out = zlibOutputBuffer; + zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; + + if (contentGZip) { + // we clear this down to contentDeflate once the GZip header has been seen + if (reqSize < GZIP_HEADER_SIZE) { + return; // need more header bytes } - - activeRequest = r; - state = STATE_SENT_REQUEST; - bodyTransferSize = -1; - chunkedTransfer = false; - noMessageBody = (r->method() == "HEAD"); - setTerminator("\r\n"); - stringstream headerData; - string path = r->path(); - string query = r->query(); - string bodyData; + if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) || + (zlibInflateBuffer[1] != GZIP_HEADER_ID2) || + (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE)) + { + return; // invalid GZip header + } - if (!client->proxyHost().empty()) { - path = r->scheme() + "://" + r->host() + r->path(); + char flags = zlibInflateBuffer[3]; + int gzipHeaderSize = GZIP_HEADER_SIZE; + if (flags & GZIP_HEADER_FEXTRA) { + gzipHeaderSize += 2; + if (reqSize < gzipHeaderSize) { + return; // need more header bytes + } + + unsigned short extraHeaderBytes = *(reinterpret_cast(zlibInflateBuffer + GZIP_HEADER_FEXTRA)); + if ( sgIsBigEndian() ) { + sgEndianSwap( &extraHeaderBytes ); + } + + gzipHeaderSize += extraHeaderBytes; + if (reqSize < gzipHeaderSize) { + return; // need more header bytes + } } - - if (r->method() == "POST") { - headerData << r->method() << " " << path << " HTTP/1.1\r\n"; - bodyData = query.substr(1); // URL-encode, drop the leading '?' - headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n"; - headerData << "Content-Length:" << bodyData.size() << "\r\n"; - } else { - headerData << r->method() << " " << path << query << " HTTP/1.1\r\n"; + + if (flags & GZIP_HEADER_FNAME) { + gzipHeaderSize++; + while (gzipHeaderSize <= reqSize) { + if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { + break; // found terminating NULL character + } + } } - headerData << "Host: " << r->hostAndPort() << "\r\n"; - headerData << "User-Agent:" << client->userAgent() << "\r\n"; - if (!client->proxyAuth().empty()) { - headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; + if (flags & GZIP_HEADER_COMMENT) { + gzipHeaderSize++; + while (gzipHeaderSize <= reqSize) { + if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { + break; // found terminating NULL character + } + } } - - BOOST_FOREACH(string h, r->requestHeaders()) { - headerData << h << ": " << r->header(h) << "\r\n"; + + if (flags & GZIP_HEADER_CRC) { + gzipHeaderSize += 2; } - - headerData << "\r\n"; // final CRLF to terminate the headers - if (!bodyData.empty()) { - headerData << bodyData; + + if (reqSize < gzipHeaderSize) { + return; // need more header bytes } - bool ok = push(headerData.str().c_str()); - if (!ok) { - SG_LOG(SG_IO, SG_WARN, "HTTP writing to socket failed"); - state = STATE_SOCKET_ERROR; - return; - } - } - - virtual void collectIncomingData(const char* s, int n) - { - if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) { - activeRequest->processBodyBytes(s, n); + zlib.next_in += gzipHeaderSize; + zlib.avail_in = reqSize - gzipHeaderSize; + // now we've processed the GZip header, can decode as deflate + contentGZip = false; + contentDeflate = true; + } + + int writtenSize = 0; + do { + int result = inflate(&zlib, Z_NO_FLUSH); + if (result == Z_OK || result == Z_STREAM_END) { + } else { - buffer += string(s, n); + SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result); + return; } + + writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out; + } while ((writtenSize == 0) && (zlib.avail_in > 0)); + + if (writtenSize > 0) { + activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize); + } } virtual void foundTerminator(void) { + idleTime.stamp(); switch (state) { - case STATE_SENT_REQUEST: - activeRequest->responseStart(buffer); - state = STATE_GETTING_HEADERS; - buffer.clear(); - if (activeRequest->responseCode() == 204) { - noMessageBody = true; - } - + case STATE_IDLE: + beginResponse(); break; case STATE_GETTING_HEADERS: @@ -204,6 +407,15 @@ public: return idleTime.elapsedMSec() > 1000 * 10; // ten seconds } + + bool hasErrorTimeout() const + { + if (state == STATE_IDLE) { + return false; + } + + return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds + } bool hasError() const { @@ -212,15 +424,7 @@ public: bool shouldStartNext() const { - return !activeRequest && !queuedRequests.empty() && - ((state == STATE_CLOSED) || (state == STATE_IDLE)); - } - - void startNext() - { - Request_ptr next = queuedRequests.front(); - queuedRequests.pop_front(); - startRequest(next); + return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS); } private: bool connectToHost() @@ -246,6 +450,21 @@ private: if (h.empty()) { // blank line terminates headers headersComplete(); + if (contentGZip || contentDeflate) { + bzero(&zlib, sizeof(z_stream)); + if (!zlibOutputBuffer) { + zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE); + } + + // NULLs means we'll get default alloc+free methods + // which is absolutely fine + zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; + zlib.next_out = zlibOutputBuffer; + if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) { + SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed"); + } + } + if (chunkedTransfer) { state = STATE_GETTING_CHUNKED; } else if (noMessageBody || (bodyTransferSize == 0)) { @@ -254,13 +473,13 @@ private: state = STATE_GETTING_BODY; responseComplete(); } else { - setByteCount(bodyTransferSize); // may be -1, that's fine + setByteCount(bodyTransferSize); // may be -1, that's fine state = STATE_GETTING_BODY; } return; } - + int colonPos = buffer.find(':'); if (colonPos < 0) { SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h); @@ -275,6 +494,7 @@ private: // of a chunked transfer) if (state == STATE_GETTING_HEADERS) { if (lkey == "content-length") { + int sz = strutils::to_int(value); if (bodyTransferSize <= 0) { bodyTransferSize = sz; @@ -284,6 +504,14 @@ private: bodyTransferSize = strutils::to_int(value); } else if (lkey == "transfer-encoding") { processTransferEncoding(value); + } else if (lkey == "content-encoding") { + if (value == "gzip") { + contentGZip = true; + } else if (value == "deflate") { + contentDeflate = true; + } else if (value != "identity") { + SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value); + } } } @@ -345,36 +573,38 @@ private: void responseComplete() { + //std::cout << "responseComplete:" << activeRequest->url() << std::endl; activeRequest->responseComplete(); client->requestFinished(this); - + + if (contentDeflate) { + inflateEnd(&zlib); + } + + assert(sentRequests.front() == activeRequest); + sentRequests.pop_front(); bool doClose = activeRequest->closeAfterComplete(); activeRequest = NULL; - if (state == STATE_GETTING_BODY) { - state = STATE_IDLE; + + if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) { if (doClose) { - // this will bring us into handleClose() above, which updates - // state to STATE_CLOSED - close(); - } + // this will bring us into handleClose() above, which updates + // state to STATE_CLOSED + close(); + + // if we have additional requests waiting, try to start them now + tryStartNextRequest(); + } } - setTerminator("\r\n"); - - // if we have more requests, and we're idle, can start the next - // request immediately. Note we cannot do this if we're in STATE_CLOSED, - // since NetChannel::close cleans up state after calling handleClose; - // instead we pick up waiting requests in update() - if (!queuedRequests.empty() && (state == STATE_IDLE)) { - startNext(); - } else { - idleTime.stamp(); - } + if (state != STATE_CLOSED) { + state = STATE_IDLE; + } + setTerminator("\r\n"); } enum ConnectionState { STATE_IDLE = 0, - STATE_SENT_REQUEST, STATE_GETTING_HEADERS, STATE_GETTING_BODY, STATE_GETTING_CHUNKED, @@ -394,8 +624,16 @@ private: SGTimeStamp idleTime; bool chunkedTransfer; bool noMessageBody; - + int requestBodyBytesToSend; + + z_stream zlib; + unsigned char* zlibInflateBuffer; + int zlibInflateBufferSize; + unsigned char* zlibOutputBuffer; + bool contentGZip, contentDeflate; + std::list queuedRequests; + std::list sentRequests; }; Client::Client() @@ -403,22 +641,24 @@ Client::Client() setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION)); } -void Client::update() +void Client::update(int waitTimeout) { - NetChannel::poll(); + NetChannel::poll(waitTimeout); ConnectionDict::iterator it = _connections.begin(); for (; it != _connections.end(); ) { - if (it->second->hasIdleTimeout() || it->second->hasError()) { + if (it->second->hasIdleTimeout() || it->second->hasError() || + it->second->hasErrorTimeout()) + { // connection has been idle for a while, clean it up // (or has an error condition, again clean it up) - SG_LOG(SG_IO, SG_INFO, "cleaning up " << it->second); ConnectionDict::iterator del = it++; delete del->second; _connections.erase(del); } else { if (it->second->shouldStartNext()) { - it->second->startNext(); + SG_LOG(SG_IO, SG_INFO, "should start next, hmm"); + it->second->tryStartNextRequest(); } ++it; diff --git a/simgear/io/HTTPClient.hxx b/simgear/io/HTTPClient.hxx index 7cc3771f..f949c7be 100644 --- a/simgear/io/HTTPClient.hxx +++ b/simgear/io/HTTPClient.hxx @@ -18,7 +18,7 @@ class Client public: Client(); - void update(); + void update(int waitTimeout = 0); void makeRequest(const Request_ptr& r); diff --git a/simgear/io/HTTPRequest.cxx b/simgear/io/HTTPRequest.cxx index 84855a4c..0e789e5c 100644 --- a/simgear/io/HTTPRequest.cxx +++ b/simgear/io/HTTPRequest.cxx @@ -218,6 +218,22 @@ bool Request::closeAfterComplete() const // for non HTTP/1.1 connections, assume server closes return _willClose || (_responseVersion != HTTP_1_1); } + +int Request::requestBodyLength() const +{ + return -1; +} + +std::string Request::requestBodyType() const +{ + return "text/plain"; +} + +void Request::getBodyData(char*, int& count) const +{ + count = 0; + return; +} } // of namespace HTTP diff --git a/simgear/io/HTTPRequest.hxx b/simgear/io/HTTPRequest.hxx index 8d8a2270..0c6e2685 100644 --- a/simgear/io/HTTPRequest.hxx +++ b/simgear/io/HTTPRequest.hxx @@ -43,7 +43,25 @@ public: void setResponseLength(unsigned int l); virtual unsigned int responseLength() const; + + /** + * Query the size of the request body. -1 (the default value) means no + * request body + */ + virtual int requestBodyLength() const; + /** + * Retrieve the body data bytes. Will be passed the maximum body bytes + * to return in the buffer, and should update count with the actual number + * of bytes written. + */ + virtual void getBodyData(char* s, int& count) const; + + /** + * retrieve the request body content type. Default is text/plain + */ + virtual std::string requestBodyType() const; + /** * running total of body bytes received so far. Can be used * to generate a completion percentage, if the response length is diff --git a/simgear/io/test_HTTP.cxx b/simgear/io/test_HTTP.cxx index dcafdb42..43d660c3 100644 --- a/simgear/io/test_HTTP.cxx +++ b/simgear/io/test_HTTP.cxx @@ -22,6 +22,14 @@ using std::stringstream; using namespace simgear; const char* BODY1 = "The quick brown fox jumps over a lazy dog."; +const char* BODY3 = "Cras ut neque nulla. Duis ut velit neque, sit amet " +"pharetra risus. In est ligula, lacinia vitae congue in, sollicitudin at " +"libero. Mauris pharetra pretium elit, nec placerat dui semper et. Maecenas " +"magna magna, placerat sed luctus ac, commodo et ligula. Mauris at purus et " +"nisl molestie auctor placerat at quam. Donec sapien magna, venenatis sed " +"iaculis id, fringilla vel arcu. Duis sed neque nisi. Cras a arcu sit amet " +"risus ultrices varius. Integer sagittis euismod dui id varius. Cras vel " +"justo gravida metus."; const unsigned int body2Size = 8 * 1024; char body2[body2Size]; @@ -29,7 +37,7 @@ char body2[body2Size]; #define COMPARE(a, b) \ if ((a) != (b)) { \ cerr << "failed:" << #a << " != " << #b << endl; \ - cerr << "\tgot:" << a << endl; \ + cerr << "\tgot:'" << a << "'" << endl; \ exit(1); \ } @@ -91,6 +99,7 @@ protected: virtual void gotBodyData(const char* s, int n) { + //std::cout << "got body data:'" << string(s, n) << "'" <bodyData, string(BODY1)); } + { + TestRequest* tr = new TestRequest("http://localhost:2000/testLorem"); + HTTP::Request_ptr own(tr); + cl.makeRequest(tr); + + waitForComplete(&cl, tr); + COMPARE(tr->responseCode(), 200); + COMPARE(tr->responseReason(), string("OK")); + COMPARE(tr->responseLength(), strlen(BODY3)); + COMPARE(tr->responseBytesReceived(), strlen(BODY3)); + COMPARE(tr->bodyData, string(BODY3)); + } + { TestRequest* tr = new TestRequest("http://localhost:2000/test_args?foo=abc&bar=1234&username=johndoe"); HTTP::Request_ptr own(tr); @@ -590,6 +632,8 @@ int main(int argc, char* argv[]) } // pipelining + cout << "testing HTTP 1.1 pipelineing" << endl; + { cl.setProxy("", 80); TestRequest* tr = new TestRequest("http://localhost:2000/test1"); @@ -597,7 +641,7 @@ int main(int argc, char* argv[]) cl.makeRequest(tr); - TestRequest* tr2 = new TestRequest("http://localhost:2000/test1"); + TestRequest* tr2 = new TestRequest("http://localhost:2000/testLorem"); HTTP::Request_ptr own2(tr2); cl.makeRequest(tr2); @@ -609,7 +653,11 @@ int main(int argc, char* argv[]) VERIFY(tr->complete); VERIFY(tr2->complete); COMPARE(tr->bodyData, string(BODY1)); - COMPARE(tr2->bodyData, string(BODY1)); + + COMPARE(tr2->responseLength(), strlen(BODY3)); + COMPARE(tr2->responseBytesReceived(), strlen(BODY3)); + COMPARE(tr2->bodyData, string(BODY3)); + COMPARE(tr3->bodyData, string(BODY1)); } @@ -633,8 +681,14 @@ int main(int argc, char* argv[]) waitForComplete(&cl, tr3); VERIFY(tr->complete); VERIFY(tr2->complete); + + COMPARE(tr->responseLength(), strlen(BODY1)); + COMPARE(tr->responseBytesReceived(), strlen(BODY1)); COMPARE(tr->bodyData, string(BODY1)); - COMPARE(tr2->bodyData, string(BODY1)); + + COMPARE(tr2->responseLength(), strlen(BODY3)); + COMPARE(tr2->responseBytesReceived(), strlen(BODY3)); + COMPARE(tr2->bodyData, string(BODY3)); COMPARE(tr3->bodyData, string(BODY1)); } -- 2.39.5