X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=simgear%2Fio%2FHTTPClient.cxx;h=8a48bff6386deb9e97c75cad39f44f19ae5ccca2;hb=adb7db9229db1d869b254ac18f1471bed464c508;hp=e654ab02dd23f16d608cc1dc24965b3f8d361bca;hpb=04dc28cb3307a2d78b024f533011628dbe5e5a52;p=simgear.git diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index e654ab02..8a48bff6 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -3,16 +3,28 @@ #include #include #include +#include +#include #include #include +#include + #include +#include #include #include #include +#include +#if defined( HAVE_VERSION_H ) && HAVE_VERSION_H #include "version.h" +#else +# if !defined(SIMGEAR_VERSION) +# define SIMGEAR_VERSION "simgear-development" +# endif +#endif using std::string; using std::stringstream; @@ -24,88 +36,366 @@ namespace simgear namespace HTTP { - +extern const int DEFAULT_HTTP_PORT = 80; +const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded"; +const unsigned int MAX_INFLIGHT_REQUESTS = 32; +const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024; +const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS; + +// see http://www.ietf.org/rfc/rfc1952.txt for these values and +// detailed description of the logic +const int GZIP_HEADER_ID1 = 31; +const int GZIP_HEADER_ID2 = 139; +const int GZIP_HEADER_METHOD_DEFLATE = 8; +const unsigned int GZIP_HEADER_SIZE = 10; +const int GZIP_HEADER_FEXTRA = 1 << 2; +const int GZIP_HEADER_FNAME = 1 << 3; +const int GZIP_HEADER_COMMENT = 1 << 4; +const int GZIP_HEADER_CRC = 1 << 1; + class Connection : public NetChat { public: Connection(Client* pr) : client(pr), - state(STATE_IDLE) + state(STATE_CLOSED), + port(DEFAULT_HTTP_PORT), + zlibInflateBuffer(NULL), + zlibInflateBufferSize(0), + zlibOutputBuffer(NULL) { - setTerminator("\r\n"); + } - void connectToHost(const string& host) + virtual ~Connection() + { + if (zlibInflateBuffer) { + free(zlibInflateBuffer); + } + + if (zlibOutputBuffer) { + free(zlibOutputBuffer); + } + } + + void setServer(const string& h, short p) { - open(); + host = h; + port = p; + } + + // socket-level errors + virtual void handleError(int error) + { + if (error == ENOENT) { + // name lookup failure + // we won't have an active request yet, so the logic below won't + // fire to actually call setFailure. Let's fail all of the requests + BOOST_FOREACH(Request_ptr req, sentRequests) { + req->setFailure(error, "hostname lookup failure"); + } + + BOOST_FOREACH(Request_ptr req, queuedRequests) { + req->setFailure(error, "hostname lookup failure"); + } + + // name lookup failure, abandon all requests on this connection + sentRequests.clear(); + queuedRequests.clear(); + } + + NetChat::handleError(error); + if (activeRequest) { + SG_LOG(SG_IO, SG_INFO, "HTTP socket error"); + activeRequest->setFailure(error, "socket error"); + activeRequest = NULL; + } + + state = STATE_SOCKET_ERROR; + } + + virtual void handleClose() + { + NetChat::handleClose(); - int colonPos = host.find(':'); - if (colonPos > 0) { - string h = host.substr(0, colonPos); - int port = strutils::to_int(host.substr(colonPos + 1)); - connect(h.c_str(), port); + if ((state == STATE_GETTING_BODY) && activeRequest) { + // force state here, so responseComplete can avoid closing the + // socket again + state = STATE_CLOSED; + responseComplete(); } else { - connect(host.c_str(), 80 /* default port */); + state = STATE_CLOSED; } + + if (sentRequests.empty()) { + return; + } + + // restore sent requests to the queue, so they will be re-sent + // when the connection opens again + queuedRequests.insert(queuedRequests.begin(), + sentRequests.begin(), sentRequests.end()); + sentRequests.clear(); } void queueRequest(const Request_ptr& r) { - if (!activeRequest) { - startRequest(r); - } else { - queuedRequests.push_back(r); - } + queuedRequests.push_back(r); + tryStartNextRequest(); } - void startRequest(const Request_ptr& r) + void beginResponse() { - activeRequest = r; - state = STATE_IDLE; - bodyTransferSize = 0; - - stringstream headerData; - string path = r->path(); - if (!client->proxyHost().empty()) { - path = "http://" + r->host() + path; - } - - int requestTime = 0; - headerData << r->method() << " " << path << " HTTP/1.1 " << client->userAgent() << "\r\n"; - headerData << "Host: " << r->host() << "\r\n"; - headerData << "X-Time: " << requestTime << "\r\n"; - - if (!client->proxyAuth().empty()) { - headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; - } + assert(!sentRequests.empty()); + + activeRequest = sentRequests.front(); + activeRequest->responseStart(buffer); + state = STATE_GETTING_HEADERS; + buffer.clear(); + if (activeRequest->responseCode() == 204) { + noMessageBody = true; + } else if (activeRequest->method() == "HEAD") { + noMessageBody = true; + } else { + noMessageBody = false; + } - BOOST_FOREACH(string h, r->requestHeaders()) { - headerData << h << ": " << r->header(h) << "\r\n"; - } + bodyTransferSize = -1; + chunkedTransfer = false; + contentGZip = contentDeflate = false; + } + + void tryStartNextRequest() + { + if (queuedRequests.empty()) { + idleTime.stamp(); + return; + } + + if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) { + return; + } + + if (state == STATE_CLOSED) { + if (!connectToHost()) { + return; + } + + setTerminator("\r\n"); + state = STATE_IDLE; + } + + Request_ptr r = queuedRequests.front(); + requestBodyBytesToSend = r->requestBodyLength(); + + stringstream headerData; + string path = r->path(); + assert(!path.empty()); + string query = r->query(); + string bodyData; + + if (!client->proxyHost().empty()) { + path = r->scheme() + "://" + r->host() + r->path(); + } - headerData << "\r\n"; // final CRLF to terminate the headers + if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) { + headerData << r->method() << " " << path << " HTTP/1.1\r\n"; + bodyData = query.substr(1); // URL-encode, drop the leading '?' + headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n"; + headerData << "Content-Length:" << bodyData.size() << "\r\n"; + } else { + headerData << r->method() << " " << path << query << " HTTP/1.1\r\n"; + if (requestBodyBytesToSend >= 0) { + headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n"; + headerData << "Content-Type:" << r->requestBodyType() << "\r\n"; + } + } + + headerData << "Host: " << r->hostAndPort() << "\r\n"; + headerData << "User-Agent:" << client->userAgent() << "\r\n"; + headerData << "Accept-Encoding: deflate, gzip\r\n"; + if (!client->proxyAuth().empty()) { + headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; + } - // TODO - add request body support for PUT, etc operations + BOOST_FOREACH(string h, r->requestHeaders()) { + headerData << h << ": " << r->header(h) << "\r\n"; + } - push(headerData.str().c_str()); + headerData << "\r\n"; // final CRLF to terminate the headers + if (!bodyData.empty()) { + headerData << bodyData; + } + + bool ok = push(headerData.str().c_str()); + if (!ok) { + // we've over-stuffed the socket, give up for now, let things + // drain down before trying to start any more requests. + return; + } + + while (requestBodyBytesToSend > 0) { + char buf[4096]; + int len = 4096; + r->getBodyData(buf, len); + if (len > 0) { + requestBodyBytesToSend -= len; + if (!bufferSend(buf, len)) { + SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer"); + state = STATE_SOCKET_ERROR; + return; + } + } else { + SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported"); + break; + } + } + + //std::cout << "did send request:" << r->url() << std::endl; + // successfully sent, remove from queue, and maybe send the next + queuedRequests.pop_front(); + sentRequests.push_back(r); + + // pipelining, let's maybe send the next request right away + tryStartNextRequest(); } virtual void collectIncomingData(const char* s, int n) { - if (state == STATE_GETTING_BODY) { - activeRequest->gotBodyData(s, n); + idleTime.stamp(); + if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) { + if (contentGZip || contentDeflate) { + expandCompressedData(s, n); + } else { + activeRequest->processBodyBytes(s, n); + } } else { buffer += string(s, n); } } + + + void expandCompressedData(const char* s, int n) + { + int reqSize = n + zlib.avail_in; + if (reqSize > zlibInflateBufferSize) { + // reallocate + unsigned char* newBuf = (unsigned char*) malloc(reqSize); + memcpy(newBuf, zlib.next_in, zlib.avail_in); + memcpy(newBuf + zlib.avail_in, s, n); + free(zlibInflateBuffer); + zlibInflateBuffer = newBuf; + zlibInflateBufferSize = reqSize; + } else { + // important to use memmove here, since it's very likely + // the source and destination ranges overlap + memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in); + memcpy(zlibInflateBuffer + zlib.avail_in, s, n); + } + + zlib.next_in = (unsigned char*) zlibInflateBuffer; + zlib.avail_in = reqSize; + zlib.next_out = zlibOutputBuffer; + zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; + + if (contentGZip && !handleGZipHeader()) { + return; + } + + int writtenSize = 0; + do { + int result = inflate(&zlib, Z_NO_FLUSH); + if (result == Z_OK || result == Z_STREAM_END) { + // nothing to do + } else { + SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result); + return; + } + + writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out; + if (result == Z_STREAM_END) { + break; + } + } while ((writtenSize == 0) && (zlib.avail_in > 0)); + + if (writtenSize > 0) { + activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize); + } + } + + bool handleGZipHeader() + { + // we clear this down to contentDeflate once the GZip header has been seen + if (zlib.avail_in < GZIP_HEADER_SIZE) { + return false; // need more header bytes + } + + if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) || + (zlibInflateBuffer[1] != GZIP_HEADER_ID2) || + (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE)) + { + return false; // invalid GZip header + } + + char flags = zlibInflateBuffer[3]; + int gzipHeaderSize = GZIP_HEADER_SIZE; + if (flags & GZIP_HEADER_FEXTRA) { + gzipHeaderSize += 2; + if (zlib.avail_in < gzipHeaderSize) { + return false; // need more header bytes + } + + unsigned short extraHeaderBytes = *(reinterpret_cast(zlibInflateBuffer + GZIP_HEADER_FEXTRA)); + if ( sgIsBigEndian() ) { + sgEndianSwap( &extraHeaderBytes ); + } + + gzipHeaderSize += extraHeaderBytes; + if (zlib.avail_in < gzipHeaderSize) { + return false; // need more header bytes + } + } + + if (flags & GZIP_HEADER_FNAME) { + gzipHeaderSize++; + while (gzipHeaderSize <= zlib.avail_in) { + if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { + break; // found terminating NULL character + } + } + } + + if (flags & GZIP_HEADER_COMMENT) { + gzipHeaderSize++; + while (gzipHeaderSize <= zlib.avail_in) { + if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { + break; // found terminating NULL character + } + } + } + + if (flags & GZIP_HEADER_CRC) { + gzipHeaderSize += 2; + } + + if (zlib.avail_in < gzipHeaderSize) { + return false; // need more header bytes + } + + zlib.next_in += gzipHeaderSize; + zlib.avail_in -= gzipHeaderSize; + // now we've processed the GZip header, can decode as deflate + contentGZip = false; + contentDeflate = true; + return true; + } virtual void foundTerminator(void) { + idleTime.stamp(); switch (state) { case STATE_IDLE: - activeRequest->responseStart(buffer); - state = STATE_GETTING_HEADERS; - buffer.clear(); + beginResponse(); break; case STATE_GETTING_HEADERS: @@ -115,36 +405,110 @@ public: case STATE_GETTING_BODY: responseComplete(); - state = STATE_IDLE; - setTerminator("\r\n"); + break; + + case STATE_GETTING_CHUNKED: + processChunkHeader(); + break; - if (!queuedRequests.empty()) { - Request_ptr next = queuedRequests.front(); - queuedRequests.pop_front(); - startRequest(next); - } + case STATE_GETTING_CHUNKED_BYTES: + setTerminator("\r\n"); + state = STATE_GETTING_CHUNKED; + buffer.clear(); + break; + + case STATE_GETTING_TRAILER: + processTrailer(); + buffer.clear(); + break; + + default: break; } } + bool hasIdleTimeout() const + { + if (state != STATE_IDLE) { + return false; + } + + return idleTime.elapsedMSec() > 1000 * 10; // ten seconds + } + + bool hasErrorTimeout() const + { + if (state == STATE_IDLE) { + return false; + } + + return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds + } + + bool hasError() const + { + return (state == STATE_SOCKET_ERROR); + } + + bool shouldStartNext() const + { + return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS); + } private: + bool connectToHost() + { + SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port); + + if (!open()) { + SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed"); + return false; + } + + if (connect(host.c_str(), port) != 0) { + return false; + } + + return true; + } + + void processHeader() { string h = strutils::simplify(buffer); if (h.empty()) { // blank line terminates headers headersComplete(); - if (bodyTransferSize > 0) { + if (contentGZip || contentDeflate) { + memset(&zlib, 0, sizeof(z_stream)); + if (!zlibOutputBuffer) { + zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE); + } + + // NULLs means we'll get default alloc+free methods + // which is absolutely fine + zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; + zlib.next_out = zlibOutputBuffer; + if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) { + SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed"); + } + } + + if (chunkedTransfer) { + state = STATE_GETTING_CHUNKED; + } else if (noMessageBody || (bodyTransferSize == 0)) { + // force the state to GETTING_BODY, to simplify logic in + // responseComplete and handleClose state = STATE_GETTING_BODY; - setByteCount(bodyTransferSize); - } else { responseComplete(); - state = STATE_IDLE; // no response body, we're done + } else { + setByteCount(bodyTransferSize); // may be -1, that's fine + state = STATE_GETTING_BODY; } + return; } - + int colonPos = buffer.find(':'); if (colonPos < 0) { SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h); @@ -155,15 +519,82 @@ private: string lkey = boost::to_lower_copy(key); string value = strutils::strip(buffer.substr(colonPos + 1)); - if (lkey == "content-length" && (bodyTransferSize <= 0)) { - bodyTransferSize = strutils::to_int(value); - } else if (lkey == "transfer-length") { - bodyTransferSize = strutils::to_int(value); + // only consider these if getting headers (as opposed to trailers + // of a chunked transfer) + if (state == STATE_GETTING_HEADERS) { + if (lkey == "content-length") { + + int sz = strutils::to_int(value); + if (bodyTransferSize <= 0) { + bodyTransferSize = sz; + } + activeRequest->setResponseLength(sz); + } else if (lkey == "transfer-length") { + bodyTransferSize = strutils::to_int(value); + } else if (lkey == "transfer-encoding") { + processTransferEncoding(value); + } else if (lkey == "content-encoding") { + if (value == "gzip") { + contentGZip = true; + } else if (value == "deflate") { + contentDeflate = true; + } else if (value != "identity") { + SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value); + } + } } - + activeRequest->responseHeader(lkey, value); } + void processTransferEncoding(const string& te) + { + if (te == "chunked") { + chunkedTransfer = true; + } else { + SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te); + // failure + } + } + + void processChunkHeader() + { + if (buffer.empty()) { + // blank line after chunk data + return; + } + + int chunkSize = 0; + int semiPos = buffer.find(';'); + if (semiPos >= 0) { + // extensions ignored for the moment + chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16); + } else { + chunkSize = strutils::to_int(buffer, 16); + } + + buffer.clear(); + if (chunkSize == 0) { // trailer start + state = STATE_GETTING_TRAILER; + return; + } + + state = STATE_GETTING_CHUNKED_BYTES; + setByteCount(chunkSize); + } + + void processTrailer() + { + if (buffer.empty()) { + // end of trailers + responseComplete(); + return; + } + + // process as a normal header + processHeader(); + } + void headersComplete() { activeRequest->responseHeadersComplete(); @@ -171,24 +602,67 @@ private: void responseComplete() { + //std::cout << "responseComplete:" << activeRequest->url() << std::endl; activeRequest->responseComplete(); client->requestFinished(this); + + if (contentDeflate) { + inflateEnd(&zlib); + } + + assert(sentRequests.front() == activeRequest); + sentRequests.pop_front(); + bool doClose = activeRequest->closeAfterComplete(); activeRequest = NULL; + + if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) { + if (doClose) { + // this will bring us into handleClose() above, which updates + // state to STATE_CLOSED + close(); + + // if we have additional requests waiting, try to start them now + tryStartNextRequest(); + } + } + + if (state != STATE_CLOSED) { + state = STATE_IDLE; + } + setTerminator("\r\n"); } enum ConnectionState { STATE_IDLE = 0, STATE_GETTING_HEADERS, - STATE_GETTING_BODY + STATE_GETTING_BODY, + STATE_GETTING_CHUNKED, + STATE_GETTING_CHUNKED_BYTES, + STATE_GETTING_TRAILER, + STATE_SOCKET_ERROR, + STATE_CLOSED ///< connection should be closed now }; Client* client; Request_ptr activeRequest; ConnectionState state; + string host; + short port; std::string buffer; int bodyTransferSize; - + SGTimeStamp idleTime; + bool chunkedTransfer; + bool noMessageBody; + int requestBodyBytesToSend; + + z_stream zlib; + unsigned char* zlibInflateBuffer; + int zlibInflateBufferSize; + unsigned char* zlibOutputBuffer; + bool contentGZip, contentDeflate; + std::list queuedRequests; + std::list sentRequests; }; Client::Client() @@ -196,20 +670,50 @@ Client::Client() setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION)); } +void Client::update(int waitTimeout) +{ + NetChannel::poll(waitTimeout); + + ConnectionDict::iterator it = _connections.begin(); + for (; it != _connections.end(); ) { + if (it->second->hasIdleTimeout() || it->second->hasError() || + it->second->hasErrorTimeout()) + { + // connection has been idle for a while, clean it up + // (or has an error condition, again clean it up) + ConnectionDict::iterator del = it++; + delete del->second; + _connections.erase(del); + } else { + if (it->second->shouldStartNext()) { + it->second->tryStartNextRequest(); + } + + ++it; + } + } // of connecion iteration +} + void Client::makeRequest(const Request_ptr& r) { string host = r->host(); + int port = r->port(); if (!_proxy.empty()) { host = _proxy; + port = _proxyPort; } - if (_connections.find(host) == _connections.end()) { + stringstream ss; + ss << host << "-" << port; + string connectionId = ss.str(); + + if (_connections.find(connectionId) == _connections.end()) { Connection* con = new Connection(this); - con->connectToHost(host); - _connections[host] = con; + con->setServer(host, port); + _connections[connectionId] = con; } - _connections[host]->queueRequest(r); + _connections[connectionId]->queueRequest(r); } void Client::requestFinished(Connection* con) @@ -222,9 +726,10 @@ void Client::setUserAgent(const string& ua) _userAgent = ua; } -void Client::setProxy(const string& proxy, const string& auth) +void Client::setProxy(const string& proxy, int port, const string& auth) { _proxy = proxy; + _proxyPort = port; _proxyAuth = auth; }