X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=simgear%2Fio%2FHTTPClient.cxx;h=31ea555a9738ab6a5f373b88176da71a82e0d949;hb=201cb61f842ef50a19438e3872ba22e588fa1afc;hp=fb1b812faaeb4d91dcbb5c9a4e0b4f1cfa0c33e5;hpb=ef48baafcc88e53ef252389beb2797161ef7d9be;p=simgear.git diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index fb1b812f..31ea555a 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -3,11 +3,15 @@ #include #include #include +#include #include #include +#include + #include +#include #include #include #include @@ -25,9 +29,6 @@ using std::string; using std::stringstream; using std::vector; -//#include -//using namespace std; - namespace simgear { @@ -35,18 +36,47 @@ namespace HTTP { extern const int DEFAULT_HTTP_PORT = 80; - +const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded"; +const unsigned int MAX_INFLIGHT_REQUESTS = 32; +const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024; +const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS; + +// see http://www.ietf.org/rfc/rfc1952.txt for these values and +// detailed description of the logic +const int GZIP_HEADER_ID1 = 31; +const int GZIP_HEADER_ID2 = 139; +const int GZIP_HEADER_METHOD_DEFLATE = 8; +const int GZIP_HEADER_SIZE = 10; +const int GZIP_HEADER_FEXTRA = 1 << 2; +const int GZIP_HEADER_FNAME = 1 << 3; +const int GZIP_HEADER_COMMENT = 1 << 4; +const int GZIP_HEADER_CRC = 1 << 1; + class Connection : public NetChat { public: Connection(Client* pr) : client(pr), state(STATE_CLOSED), - port(DEFAULT_HTTP_PORT) + port(DEFAULT_HTTP_PORT), + zlibInflateBuffer(NULL), + zlibInflateBufferSize(0), + zlibOutputBuffer(NULL) { } + virtual ~Connection() + { + if (zlibInflateBuffer) { + free(zlibInflateBuffer); + } + + if (zlibOutputBuffer) { + free(zlibOutputBuffer); + } + } + void setServer(const string& h, short p) { host = h; @@ -67,7 +97,7 @@ public: } virtual void handleClose() - { + { NetChat::handleClose(); if ((state == STATE_GETTING_BODY) && activeRequest) { @@ -78,78 +108,266 @@ public: } else { state = STATE_CLOSED; } + + if (sentRequests.empty()) { + return; + } + + // restore sent requests to the queue, so they will be re-sent + // when the connection opens again + queuedRequests.insert(queuedRequests.begin(), + sentRequests.begin(), sentRequests.end()); + sentRequests.clear(); } void queueRequest(const Request_ptr& r) { - if (!activeRequest) { - startRequest(r); - } else { - queuedRequests.push_back(r); - } + queuedRequests.push_back(r); + tryStartNextRequest(); } - void startRequest(const Request_ptr& r) + void beginResponse() { - if (state == STATE_CLOSED) { - if (!connectToHost()) { - return; - } - - state = STATE_IDLE; - } - - activeRequest = r; - state = STATE_SENT_REQUEST; - bodyTransferSize = -1; - chunkedTransfer = false; - setTerminator("\r\n"); - - stringstream headerData; - string path = r->path(); - if (!client->proxyHost().empty()) { - path = r->url(); - } + assert(!sentRequests.empty()); + + activeRequest = sentRequests.front(); + activeRequest->responseStart(buffer); + state = STATE_GETTING_HEADERS; + buffer.clear(); + if (activeRequest->responseCode() == 204) { + noMessageBody = true; + } else if (activeRequest->method() == "HEAD") { + noMessageBody = true; + } else { + noMessageBody = false; + } - headerData << r->method() << " " << path << " HTTP/1.1\r\n"; - headerData << "Host: " << r->hostAndPort() << "\r\n"; - headerData << "User-Agent:" << client->userAgent() << "\r\n"; - if (!client->proxyAuth().empty()) { - headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; - } - - BOOST_FOREACH(string h, r->requestHeaders()) { - headerData << h << ": " << r->header(h) << "\r\n"; - } + bodyTransferSize = -1; + chunkedTransfer = false; + contentGZip = contentDeflate = false; + } + + void tryStartNextRequest() + { + if (queuedRequests.empty()) { + idleTime.stamp(); + return; + } + + if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) { + return; + } + + if (state == STATE_CLOSED) { + if (!connectToHost()) { + return; + } + + setTerminator("\r\n"); + state = STATE_IDLE; + } + + Request_ptr r = queuedRequests.front(); + requestBodyBytesToSend = r->requestBodyLength(); + + stringstream headerData; + string path = r->path(); + string query = r->query(); + string bodyData; + + if (!client->proxyHost().empty()) { + path = r->scheme() + "://" + r->host() + r->path(); + } - headerData << "\r\n"; // final CRLF to terminate the headers + if (r->method() == "POST") { + headerData << r->method() << " " << path << " HTTP/1.1\r\n"; + bodyData = query.substr(1); // URL-encode, drop the leading '?' + headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n"; + headerData << "Content-Length:" << bodyData.size() << "\r\n"; + } else { + headerData << r->method() << " " << path << query << " HTTP/1.1\r\n"; + if (requestBodyBytesToSend >= 0) { + headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n"; + headerData << "Content-Type:" << r->requestBodyType() << "\r\n"; + } + } + + headerData << "Host: " << r->hostAndPort() << "\r\n"; + headerData << "User-Agent:" << client->userAgent() << "\r\n"; + headerData << "Accept-Encoding: deflate, gzip\r\n"; + if (!client->proxyAuth().empty()) { + headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; + } - // TODO - add request body support for PUT, etc operations + BOOST_FOREACH(string h, r->requestHeaders()) { + headerData << h << ": " << r->header(h) << "\r\n"; + } - bool ok = push(headerData.str().c_str()); - if (!ok) { - SG_LOG(SG_IO, SG_WARN, "HTTP writing to socket failed"); + headerData << "\r\n"; // final CRLF to terminate the headers + if (!bodyData.empty()) { + headerData << bodyData; + } + + bool ok = push(headerData.str().c_str()); + if (!ok) { + // we've over-stuffed the socket, give up for now, let things + // drain down before trying to start any more requests. + return; + } + + while (requestBodyBytesToSend > 0) { + char buf[4096]; + int len = 4096; + r->getBodyData(buf, len); + if (len > 0) { + requestBodyBytesToSend -= len; + if (!bufferSend(buf, len)) { + SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer"); state = STATE_SOCKET_ERROR; return; - } + } + } else { + SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported"); + break; + } + } + + //std::cout << "did send request:" << r->url() << std::endl; + // successfully sent, remove from queue, and maybe send the next + queuedRequests.pop_front(); + sentRequests.push_back(r); + + // pipelining, let's maybe send the next request right away + tryStartNextRequest(); } virtual void collectIncomingData(const char* s, int n) { + idleTime.stamp(); if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) { + if (contentGZip || contentDeflate) { + expandCompressedData(s, n); + } else { activeRequest->processBodyBytes(s, n); + } } else { buffer += string(s, n); } } + + + void expandCompressedData(const char* s, int n) + { + int reqSize = n + zlib.avail_in; + if (reqSize > zlibInflateBufferSize) { + // reallocate + unsigned char* newBuf = (unsigned char*) malloc(reqSize); + memcpy(newBuf, zlib.next_in, zlib.avail_in); + memcpy(newBuf + zlib.avail_in, s, n); + free(zlibInflateBuffer); + zlibInflateBuffer = newBuf; + zlibInflateBufferSize = reqSize; + } else { + // important to use memmove here, since it's very likely + // the source and destination ranges overlap + memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in); + memcpy(zlibInflateBuffer + zlib.avail_in, s, n); + } + + zlib.next_in = (unsigned char*) zlibInflateBuffer; + zlib.avail_in = reqSize; + zlib.next_out = zlibOutputBuffer; + zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; + + if (contentGZip) { + // we clear this down to contentDeflate once the GZip header has been seen + if (reqSize < GZIP_HEADER_SIZE) { + return; // need more header bytes + } + + if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) || + (zlibInflateBuffer[1] != GZIP_HEADER_ID2) || + (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE)) + { + return; // invalid GZip header + } + + char flags = zlibInflateBuffer[3]; + int gzipHeaderSize = GZIP_HEADER_SIZE; + if (flags & GZIP_HEADER_FEXTRA) { + gzipHeaderSize += 2; + if (reqSize < gzipHeaderSize) { + return; // need more header bytes + } + + unsigned short extraHeaderBytes = *(reinterpret_cast(zlibInflateBuffer + GZIP_HEADER_FEXTRA)); + if ( sgIsBigEndian() ) { + sgEndianSwap( &extraHeaderBytes ); + } + + gzipHeaderSize += extraHeaderBytes; + if (reqSize < gzipHeaderSize) { + return; // need more header bytes + } + } + + if (flags & GZIP_HEADER_FNAME) { + gzipHeaderSize++; + while (gzipHeaderSize <= reqSize) { + if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { + break; // found terminating NULL character + } + } + } + + if (flags & GZIP_HEADER_COMMENT) { + gzipHeaderSize++; + while (gzipHeaderSize <= reqSize) { + if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { + break; // found terminating NULL character + } + } + } + + if (flags & GZIP_HEADER_CRC) { + gzipHeaderSize += 2; + } + + if (reqSize < gzipHeaderSize) { + return; // need more header bytes + } + + zlib.next_in += gzipHeaderSize; + zlib.avail_in = reqSize - gzipHeaderSize; + // now we've processed the GZip header, can decode as deflate + contentGZip = false; + contentDeflate = true; + } + + int writtenSize = 0; + do { + int result = inflate(&zlib, Z_NO_FLUSH); + if (result == Z_OK || result == Z_STREAM_END) { + + } else { + SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result); + return; + } + + writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out; + } while ((writtenSize == 0) && (zlib.avail_in > 0)); + + if (writtenSize > 0) { + activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize); + } + } virtual void foundTerminator(void) { + idleTime.stamp(); switch (state) { - case STATE_SENT_REQUEST: - activeRequest->responseStart(buffer); - state = STATE_GETTING_HEADERS; - buffer.clear(); + case STATE_IDLE: + beginResponse(); break; case STATE_GETTING_HEADERS: @@ -189,6 +407,15 @@ public: return idleTime.elapsedMSec() > 1000 * 10; // ten seconds } + + bool hasErrorTimeout() const + { + if (state == STATE_IDLE) { + return false; + } + + return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds + } bool hasError() const { @@ -197,15 +424,7 @@ public: bool shouldStartNext() const { - return !activeRequest && !queuedRequests.empty() && - ((state == STATE_CLOSED) || (state == STATE_IDLE)); - } - - void startNext() - { - Request_ptr next = queuedRequests.front(); - queuedRequests.pop_front(); - startRequest(next); + return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS); } private: bool connectToHost() @@ -231,16 +450,36 @@ private: if (h.empty()) { // blank line terminates headers headersComplete(); + if (contentGZip || contentDeflate) { + memset(&zlib, 0, sizeof(z_stream)); + if (!zlibOutputBuffer) { + zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE); + } + + // NULLs means we'll get default alloc+free methods + // which is absolutely fine + zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; + zlib.next_out = zlibOutputBuffer; + if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) { + SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed"); + } + } + if (chunkedTransfer) { state = STATE_GETTING_CHUNKED; + } else if (noMessageBody || (bodyTransferSize == 0)) { + // force the state to GETTING_BODY, to simplify logic in + // responseComplete and handleClose + state = STATE_GETTING_BODY; + responseComplete(); } else { - setByteCount(bodyTransferSize); // may be -1, that's fine + setByteCount(bodyTransferSize); // may be -1, that's fine state = STATE_GETTING_BODY; } return; } - + int colonPos = buffer.find(':'); if (colonPos < 0) { SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h); @@ -255,6 +494,7 @@ private: // of a chunked transfer) if (state == STATE_GETTING_HEADERS) { if (lkey == "content-length") { + int sz = strutils::to_int(value); if (bodyTransferSize <= 0) { bodyTransferSize = sz; @@ -264,6 +504,14 @@ private: bodyTransferSize = strutils::to_int(value); } else if (lkey == "transfer-encoding") { processTransferEncoding(value); + } else if (lkey == "content-encoding") { + if (value == "gzip") { + contentGZip = true; + } else if (value == "deflate") { + contentDeflate = true; + } else if (value != "identity") { + SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value); + } } } @@ -325,37 +573,38 @@ private: void responseComplete() { + //std::cout << "responseComplete:" << activeRequest->url() << std::endl; activeRequest->responseComplete(); client->requestFinished(this); - //cout << "response complete: " << activeRequest->url() << endl; - + + if (contentDeflate) { + inflateEnd(&zlib); + } + + assert(sentRequests.front() == activeRequest); + sentRequests.pop_front(); bool doClose = activeRequest->closeAfterComplete(); activeRequest = NULL; - if (state == STATE_GETTING_BODY) { - state = STATE_IDLE; + + if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) { if (doClose) { - // this will bring us into handleClose() above, which updates - // state to STATE_CLOSED - close(); - } + // this will bring us into handleClose() above, which updates + // state to STATE_CLOSED + close(); + + // if we have additional requests waiting, try to start them now + tryStartNextRequest(); + } } - setTerminator("\r\n"); - - // if we have more requests, and we're idle, can start the next - // request immediately. Note we cannot do this if we're in STATE_CLOSED, - // since NetChannel::close cleans up state after calling handleClose; - // instead we pick up waiting requests in update() - if (!queuedRequests.empty() && (state == STATE_IDLE)) { - startNext(); - } else { - idleTime.stamp(); - } + if (state != STATE_CLOSED) { + state = STATE_IDLE; + } + setTerminator("\r\n"); } enum ConnectionState { STATE_IDLE = 0, - STATE_SENT_REQUEST, STATE_GETTING_HEADERS, STATE_GETTING_BODY, STATE_GETTING_CHUNKED, @@ -374,8 +623,17 @@ private: int bodyTransferSize; SGTimeStamp idleTime; bool chunkedTransfer; - + bool noMessageBody; + int requestBodyBytesToSend; + + z_stream zlib; + unsigned char* zlibInflateBuffer; + int zlibInflateBufferSize; + unsigned char* zlibOutputBuffer; + bool contentGZip, contentDeflate; + std::list queuedRequests; + std::list sentRequests; }; Client::Client() @@ -383,22 +641,24 @@ Client::Client() setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION)); } -void Client::update() +void Client::update(int waitTimeout) { - NetChannel::poll(); + NetChannel::poll(waitTimeout); ConnectionDict::iterator it = _connections.begin(); for (; it != _connections.end(); ) { - if (it->second->hasIdleTimeout() || it->second->hasError()) { + if (it->second->hasIdleTimeout() || it->second->hasError() || + it->second->hasErrorTimeout()) + { // connection has been idle for a while, clean it up // (or has an error condition, again clean it up) - SG_LOG(SG_IO, SG_INFO, "cleaning up " << it->second); ConnectionDict::iterator del = it++; delete del->second; _connections.erase(del); } else { if (it->second->shouldStartNext()) { - it->second->startNext(); + SG_LOG(SG_IO, SG_INFO, "should start next, hmm"); + it->second->tryStartNextRequest(); } ++it;