X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=simgear%2Fio%2FHTTPClient.cxx;h=31ea555a9738ab6a5f373b88176da71a82e0d949;hb=201cb61f842ef50a19438e3872ba22e588fa1afc;hp=3ea5f02a127b8890c0e99ad748e7ec3d1817271a;hpb=115531e944f51c3295f69f3ed361bf421515cc58;p=simgear.git diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index 3ea5f02a..31ea555a 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -3,11 +3,15 @@ #include #include #include +#include #include #include +#include + #include +#include #include #include #include @@ -32,36 +36,59 @@ namespace HTTP { extern const int DEFAULT_HTTP_PORT = 80; - +const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded"; +const unsigned int MAX_INFLIGHT_REQUESTS = 32; +const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024; +const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS; + +// see http://www.ietf.org/rfc/rfc1952.txt for these values and +// detailed description of the logic +const int GZIP_HEADER_ID1 = 31; +const int GZIP_HEADER_ID2 = 139; +const int GZIP_HEADER_METHOD_DEFLATE = 8; +const int GZIP_HEADER_SIZE = 10; +const int GZIP_HEADER_FEXTRA = 1 << 2; +const int GZIP_HEADER_FNAME = 1 << 3; +const int GZIP_HEADER_COMMENT = 1 << 4; +const int GZIP_HEADER_CRC = 1 << 1; + class Connection : public NetChat { public: Connection(Client* pr) : client(pr), - state(STATE_IDLE) + state(STATE_CLOSED), + port(DEFAULT_HTTP_PORT), + zlibInflateBuffer(NULL), + zlibInflateBufferSize(0), + zlibOutputBuffer(NULL) { - setTerminator("\r\n"); + } - bool connectToHost(const string& host, short port) + virtual ~Connection() { - if (!open()) { - SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed"); - return false; - } - - if (connect(host.c_str(), port) != 0) { - return false; - } - - return true; + if (zlibInflateBuffer) { + free(zlibInflateBuffer); + } + + if (zlibOutputBuffer) { + free(zlibOutputBuffer); + } + } + + void setServer(const string& h, short p) + { + host = h; + port = p; } // socket-level errors virtual void handleError(int error) - { + { NetChat::handleError(error); if (activeRequest) { + SG_LOG(SG_IO, SG_INFO, "HTTP socket error"); activeRequest->setFailure(error, "socket error"); activeRequest = NULL; } @@ -69,62 +96,278 @@ public: state = STATE_SOCKET_ERROR; } - void queueRequest(const Request_ptr& r) - { - if (!activeRequest) { - startRequest(r); + virtual void handleClose() + { + NetChat::handleClose(); + + if ((state == STATE_GETTING_BODY) && activeRequest) { + // force state here, so responseComplete can avoid closing the + // socket again + state = STATE_CLOSED; + responseComplete(); } else { - queuedRequests.push_back(r); + state = STATE_CLOSED; } + + if (sentRequests.empty()) { + return; + } + + // restore sent requests to the queue, so they will be re-sent + // when the connection opens again + queuedRequests.insert(queuedRequests.begin(), + sentRequests.begin(), sentRequests.end()); + sentRequests.clear(); } - void startRequest(const Request_ptr& r) + void queueRequest(const Request_ptr& r) { - activeRequest = r; - state = STATE_IDLE; - bodyTransferSize = 0; - chunkedTransfer = false; - - stringstream headerData; - string path = r->path(); - if (!client->proxyHost().empty()) { - path = r->url(); - } - - headerData << r->method() << " " << path << " HTTP/1.1\r\n"; - headerData << "Host: " << r->hostAndPort() << "\r\n"; - headerData << "User-Agent:" << client->userAgent() << "\r\n"; - if (!client->proxyAuth().empty()) { - headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; - } + queuedRequests.push_back(r); + tryStartNextRequest(); + } + + void beginResponse() + { + assert(!sentRequests.empty()); + + activeRequest = sentRequests.front(); + activeRequest->responseStart(buffer); + state = STATE_GETTING_HEADERS; + buffer.clear(); + if (activeRequest->responseCode() == 204) { + noMessageBody = true; + } else if (activeRequest->method() == "HEAD") { + noMessageBody = true; + } else { + noMessageBody = false; + } - BOOST_FOREACH(string h, r->requestHeaders()) { - headerData << h << ": " << r->header(h) << "\r\n"; - } + bodyTransferSize = -1; + chunkedTransfer = false; + contentGZip = contentDeflate = false; + } + + void tryStartNextRequest() + { + if (queuedRequests.empty()) { + idleTime.stamp(); + return; + } + + if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) { + return; + } + + if (state == STATE_CLOSED) { + if (!connectToHost()) { + return; + } + + setTerminator("\r\n"); + state = STATE_IDLE; + } + + Request_ptr r = queuedRequests.front(); + requestBodyBytesToSend = r->requestBodyLength(); + + stringstream headerData; + string path = r->path(); + string query = r->query(); + string bodyData; + + if (!client->proxyHost().empty()) { + path = r->scheme() + "://" + r->host() + r->path(); + } - headerData << "\r\n"; // final CRLF to terminate the headers + if (r->method() == "POST") { + headerData << r->method() << " " << path << " HTTP/1.1\r\n"; + bodyData = query.substr(1); // URL-encode, drop the leading '?' + headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n"; + headerData << "Content-Length:" << bodyData.size() << "\r\n"; + } else { + headerData << r->method() << " " << path << query << " HTTP/1.1\r\n"; + if (requestBodyBytesToSend >= 0) { + headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n"; + headerData << "Content-Type:" << r->requestBodyType() << "\r\n"; + } + } + + headerData << "Host: " << r->hostAndPort() << "\r\n"; + headerData << "User-Agent:" << client->userAgent() << "\r\n"; + headerData << "Accept-Encoding: deflate, gzip\r\n"; + if (!client->proxyAuth().empty()) { + headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; + } - // TODO - add request body support for PUT, etc operations + BOOST_FOREACH(string h, r->requestHeaders()) { + headerData << h << ": " << r->header(h) << "\r\n"; + } - push(headerData.str().c_str()); + headerData << "\r\n"; // final CRLF to terminate the headers + if (!bodyData.empty()) { + headerData << bodyData; + } + + bool ok = push(headerData.str().c_str()); + if (!ok) { + // we've over-stuffed the socket, give up for now, let things + // drain down before trying to start any more requests. + return; + } + + while (requestBodyBytesToSend > 0) { + char buf[4096]; + int len = 4096; + r->getBodyData(buf, len); + if (len > 0) { + requestBodyBytesToSend -= len; + if (!bufferSend(buf, len)) { + SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer"); + state = STATE_SOCKET_ERROR; + return; + } + } else { + SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported"); + break; + } + } + + //std::cout << "did send request:" << r->url() << std::endl; + // successfully sent, remove from queue, and maybe send the next + queuedRequests.pop_front(); + sentRequests.push_back(r); + + // pipelining, let's maybe send the next request right away + tryStartNextRequest(); } virtual void collectIncomingData(const char* s, int n) { + idleTime.stamp(); if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) { + if (contentGZip || contentDeflate) { + expandCompressedData(s, n); + } else { activeRequest->processBodyBytes(s, n); + } } else { buffer += string(s, n); } } + + + void expandCompressedData(const char* s, int n) + { + int reqSize = n + zlib.avail_in; + if (reqSize > zlibInflateBufferSize) { + // reallocate + unsigned char* newBuf = (unsigned char*) malloc(reqSize); + memcpy(newBuf, zlib.next_in, zlib.avail_in); + memcpy(newBuf + zlib.avail_in, s, n); + free(zlibInflateBuffer); + zlibInflateBuffer = newBuf; + zlibInflateBufferSize = reqSize; + } else { + // important to use memmove here, since it's very likely + // the source and destination ranges overlap + memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in); + memcpy(zlibInflateBuffer + zlib.avail_in, s, n); + } + + zlib.next_in = (unsigned char*) zlibInflateBuffer; + zlib.avail_in = reqSize; + zlib.next_out = zlibOutputBuffer; + zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; + + if (contentGZip) { + // we clear this down to contentDeflate once the GZip header has been seen + if (reqSize < GZIP_HEADER_SIZE) { + return; // need more header bytes + } + + if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) || + (zlibInflateBuffer[1] != GZIP_HEADER_ID2) || + (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE)) + { + return; // invalid GZip header + } + + char flags = zlibInflateBuffer[3]; + int gzipHeaderSize = GZIP_HEADER_SIZE; + if (flags & GZIP_HEADER_FEXTRA) { + gzipHeaderSize += 2; + if (reqSize < gzipHeaderSize) { + return; // need more header bytes + } + + unsigned short extraHeaderBytes = *(reinterpret_cast(zlibInflateBuffer + GZIP_HEADER_FEXTRA)); + if ( sgIsBigEndian() ) { + sgEndianSwap( &extraHeaderBytes ); + } + + gzipHeaderSize += extraHeaderBytes; + if (reqSize < gzipHeaderSize) { + return; // need more header bytes + } + } + + if (flags & GZIP_HEADER_FNAME) { + gzipHeaderSize++; + while (gzipHeaderSize <= reqSize) { + if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { + break; // found terminating NULL character + } + } + } + + if (flags & GZIP_HEADER_COMMENT) { + gzipHeaderSize++; + while (gzipHeaderSize <= reqSize) { + if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { + break; // found terminating NULL character + } + } + } + + if (flags & GZIP_HEADER_CRC) { + gzipHeaderSize += 2; + } + + if (reqSize < gzipHeaderSize) { + return; // need more header bytes + } + + zlib.next_in += gzipHeaderSize; + zlib.avail_in = reqSize - gzipHeaderSize; + // now we've processed the GZip header, can decode as deflate + contentGZip = false; + contentDeflate = true; + } + + int writtenSize = 0; + do { + int result = inflate(&zlib, Z_NO_FLUSH); + if (result == Z_OK || result == Z_STREAM_END) { + + } else { + SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result); + return; + } + + writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out; + } while ((writtenSize == 0) && (zlib.avail_in > 0)); + + if (writtenSize > 0) { + activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize); + } + } virtual void foundTerminator(void) { + idleTime.stamp(); switch (state) { case STATE_IDLE: - activeRequest->responseStart(buffer); - state = STATE_GETTING_HEADERS; - buffer.clear(); + beginResponse(); break; case STATE_GETTING_HEADERS: @@ -145,6 +388,7 @@ public: state = STATE_GETTING_CHUNKED; break; + case STATE_GETTING_TRAILER: processTrailer(); buffer.clear(); @@ -163,29 +407,79 @@ public: return idleTime.elapsedMSec() > 1000 * 10; // ten seconds } + + bool hasErrorTimeout() const + { + if (state == STATE_IDLE) { + return false; + } + + return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds + } bool hasError() const { return (state == STATE_SOCKET_ERROR); } + + bool shouldStartNext() const + { + return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS); + } private: + bool connectToHost() + { + SG_LOG(SG_IO, SG_INFO, "HTTP connecting to " << host << ":" << port); + + if (!open()) { + SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed"); + return false; + } + + if (connect(host.c_str(), port) != 0) { + return false; + } + + return true; + } + + void processHeader() { string h = strutils::simplify(buffer); if (h.empty()) { // blank line terminates headers headersComplete(); + if (contentGZip || contentDeflate) { + memset(&zlib, 0, sizeof(z_stream)); + if (!zlibOutputBuffer) { + zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE); + } + + // NULLs means we'll get default alloc+free methods + // which is absolutely fine + zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; + zlib.next_out = zlibOutputBuffer; + if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) { + SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed"); + } + } + if (chunkedTransfer) { state = STATE_GETTING_CHUNKED; - } else if (bodyTransferSize > 0) { + } else if (noMessageBody || (bodyTransferSize == 0)) { + // force the state to GETTING_BODY, to simplify logic in + // responseComplete and handleClose state = STATE_GETTING_BODY; - setByteCount(bodyTransferSize); - } else { responseComplete(); + } else { + setByteCount(bodyTransferSize); // may be -1, that's fine + state = STATE_GETTING_BODY; } + return; } - + int colonPos = buffer.find(':'); if (colonPos < 0) { SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h); @@ -200,6 +494,7 @@ private: // of a chunked transfer) if (state == STATE_GETTING_HEADERS) { if (lkey == "content-length") { + int sz = strutils::to_int(value); if (bodyTransferSize <= 0) { bodyTransferSize = sz; @@ -209,6 +504,14 @@ private: bodyTransferSize = strutils::to_int(value); } else if (lkey == "transfer-encoding") { processTransferEncoding(value); + } else if (lkey == "content-encoding") { + if (value == "gzip") { + contentGZip = true; + } else if (value == "deflate") { + contentDeflate = true; + } else if (value != "identity") { + SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value); + } } } @@ -270,20 +573,34 @@ private: void responseComplete() { + //std::cout << "responseComplete:" << activeRequest->url() << std::endl; activeRequest->responseComplete(); client->requestFinished(this); + + if (contentDeflate) { + inflateEnd(&zlib); + } + + assert(sentRequests.front() == activeRequest); + sentRequests.pop_front(); + bool doClose = activeRequest->closeAfterComplete(); activeRequest = NULL; + + if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) { + if (doClose) { + // this will bring us into handleClose() above, which updates + // state to STATE_CLOSED + close(); + + // if we have additional requests waiting, try to start them now + tryStartNextRequest(); + } + } + if (state != STATE_CLOSED) { state = STATE_IDLE; - setTerminator("\r\n"); - - if (!queuedRequests.empty()) { - Request_ptr next = queuedRequests.front(); - queuedRequests.pop_front(); - startRequest(next); - } else { - idleTime.stamp(); - } + } + setTerminator("\r\n"); } enum ConnectionState { @@ -293,18 +610,30 @@ private: STATE_GETTING_CHUNKED, STATE_GETTING_CHUNKED_BYTES, STATE_GETTING_TRAILER, - STATE_SOCKET_ERROR + STATE_SOCKET_ERROR, + STATE_CLOSED ///< connection should be closed now }; Client* client; Request_ptr activeRequest; ConnectionState state; + string host; + short port; std::string buffer; int bodyTransferSize; SGTimeStamp idleTime; bool chunkedTransfer; - + bool noMessageBody; + int requestBodyBytesToSend; + + z_stream zlib; + unsigned char* zlibInflateBuffer; + int zlibInflateBufferSize; + unsigned char* zlibOutputBuffer; + bool contentGZip, contentDeflate; + std::list queuedRequests; + std::list sentRequests; }; Client::Client() @@ -312,18 +641,26 @@ Client::Client() setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION)); } -void Client::update() +void Client::update(int waitTimeout) { - NetChannel::poll(); - + NetChannel::poll(waitTimeout); + ConnectionDict::iterator it = _connections.begin(); for (; it != _connections.end(); ) { - if (it->second->hasIdleTimeout() || it->second->hasError()) { + if (it->second->hasIdleTimeout() || it->second->hasError() || + it->second->hasErrorTimeout()) + { // connection has been idle for a while, clean it up + // (or has an error condition, again clean it up) ConnectionDict::iterator del = it++; delete del->second; _connections.erase(del); } else { + if (it->second->shouldStartNext()) { + SG_LOG(SG_IO, SG_INFO, "should start next, hmm"); + it->second->tryStartNextRequest(); + } + ++it; } } // of connecion iteration @@ -344,18 +681,7 @@ void Client::makeRequest(const Request_ptr& r) if (_connections.find(connectionId) == _connections.end()) { Connection* con = new Connection(this); - bool ok = con->connectToHost(host, port); - if (!ok) { - // since NetChannel connect is non-blocking, this failure - // path is unlikely, but still checked for. - SG_LOG(SG_IO, SG_WARN, "unable to connect to host:" - << host << " (port:" << port << ")"); - delete con; - - r->setFailure(-1, "unable to connect to host"); - return; - } - + con->setServer(host, port); _connections[connectionId] = con; }