X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=simgear%2Fio%2FHTTPClient.cxx;h=4d1f2c0e757292a8cb37e7ed81a8130882a9d65f;hb=3bcd0bafd5fba1eebadfd1cb8a7294d665cf1932;hp=8b4b25941bded817096a6323cd02e8d5beba4ffe;hpb=8e75c6be5047bdb0deacc385decc4ff4187c4990;p=simgear.git diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index 8b4b2594..4d1f2c0e 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -22,12 +22,12 @@ // #include "HTTPClient.hxx" +#include "HTTPFileRequest.hxx" #include #include #include // rand() #include -#include #include #include #include @@ -51,10 +51,6 @@ # endif #endif -using std::string; -using std::stringstream; -using std::vector; - namespace simgear { @@ -87,6 +83,7 @@ public: SGTimeStamp timeTransferSample; unsigned int bytesTransferred; unsigned int lastTransferRate; + uint64_t totalBytesDownloaded; }; class Connection : public NetChat @@ -102,8 +99,21 @@ public: virtual ~Connection() { } + + virtual void handleBufferRead (NetBuffer& buffer) + { + if( !activeRequest || !activeRequest->isComplete() ) + return NetChat::handleBufferRead(buffer); + + // Request should be aborted (signaled by setting its state to complete). + + // force the state to GETTING_BODY, to simplify logic in + // responseComplete and handleClose + state = STATE_GETTING_BODY; + responseComplete(); + } - void setServer(const string& h, short p) + void setServer(const std::string& h, short p) { host = h; port = p; @@ -112,27 +122,28 @@ public: // socket-level errors virtual void handleError(int error) { - if (error == ENOENT) { - // name lookup failure - // we won't have an active request yet, so the logic below won't - // fire to actually call setFailure. Let's fail all of the requests + const char* errStr = strerror(error); + if (!activeRequest) + { + // connection level failure, eg name lookup or routing + // we won't have an active request yet, so let's fail all of the + // requests since we presume it's a systematic failure for + // the host in question BOOST_FOREACH(Request_ptr req, sentRequests) { - req->setFailure(error, "hostname lookup failure"); + req->setFailure(error, errStr); } BOOST_FOREACH(Request_ptr req, queuedRequests) { - req->setFailure(error, "hostname lookup failure"); + req->setFailure(error, errStr); } - // name lookup failure, abandon all requests on this connection sentRequests.clear(); queuedRequests.clear(); } NetChat::handleError(error); if (activeRequest) { - SG_LOG(SG_IO, SG_INFO, "HTTP socket error"); - activeRequest->setFailure(error, "socket error"); + activeRequest->setFailure(error, errStr); activeRequest = NULL; _contentDecoder.reset(); } @@ -204,8 +215,12 @@ public: assert(state == STATE_WAITING_FOR_RESPONSE); activeRequest = sentRequests.front(); + try { + activeRequest->responseStart(buffer); + } catch (sg_exception& e) { + handleError(EIO); + } - activeRequest->responseStart(buffer); state = STATE_GETTING_HEADERS; buffer.clear(); if (activeRequest->responseCode() == 204) { @@ -223,6 +238,10 @@ public: void tryStartNextRequest() { + while( !queuedRequests.empty() + && queuedRequests.front()->isComplete() ) + queuedRequests.pop_front(); + if (queuedRequests.empty()) { idleTime.stamp(); return; @@ -234,6 +253,7 @@ public: if (state == STATE_CLOSED) { if (!connectToHost()) { + return; } @@ -243,28 +263,28 @@ public: Request_ptr r = queuedRequests.front(); r->requestStart(); - requestBodyBytesToSend = r->requestBodyLength(); - - stringstream headerData; - string path = r->path(); + + std::stringstream headerData; + std::string path = r->path(); assert(!path.empty()); - string query = r->query(); - string bodyData; + std::string query = r->query(); + std::string bodyData; if (!client->proxyHost().empty()) { path = r->scheme() + "://" + r->host() + r->path(); } - if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) { + if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) { headerData << r->method() << " " << path << " HTTP/1.1\r\n"; bodyData = query.substr(1); // URL-encode, drop the leading '?' headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n"; headerData << "Content-Length:" << bodyData.size() << "\r\n"; } else { headerData << r->method() << " " << path << query << " HTTP/1.1\r\n"; - if (requestBodyBytesToSend >= 0) { - headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n"; - headerData << "Content-Type:" << r->requestBodyType() << "\r\n"; + if( r->hasBodyData() ) + { + headerData << "Content-Length:" << r->bodyLength() << "\r\n"; + headerData << "Content-Type:" << r->bodyType() << "\r\n"; } } @@ -275,8 +295,8 @@ public: headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; } - BOOST_FOREACH(string h, r->requestHeaders()) { - headerData << h << ": " << r->header(h) << "\r\n"; + BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) { + headerData << h.first << ": " << h.second << "\r\n"; } headerData << "\r\n"; // final CRLF to terminate the headers @@ -291,33 +311,42 @@ public: // drain down before trying to start any more requests. return; } - - while (requestBodyBytesToSend > 0) { - char buf[4096]; - int len = r->getBodyData(buf, 4096); - if (len > 0) { - requestBodyBytesToSend -= len; - if (!bufferSend(buf, len)) { - SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer"); - state = STATE_SOCKET_ERROR; - return; + + if( r->hasBodyData() ) + for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();) + { + char buf[4096]; + size_t len = r->getBodyData(buf, body_bytes_sent, 4096); + if( len ) + { + if( !bufferSend(buf, len) ) + { + SG_LOG(SG_IO, + SG_WARN, + "overflow the HTTP::Connection output buffer"); + state = STATE_SOCKET_ERROR; + return; + } + body_bytes_sent += len; + } + else + { + SG_LOG(SG_IO, + SG_WARN, + "HTTP asynchronous request body generation is unsupported"); + break; } - // SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%"); - } else { - SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported"); - break; } - } - // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() << - // "\n\t @ " << reinterpret_cast(r.ptr()) << - // "\n\t on connection " << this); - // successfully sent, remove from queue, and maybe send the next + // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() << + // "\n\t @ " << reinterpret_cast(r.ptr()) << + // "\n\t on connection " << this); + // successfully sent, remove from queue, and maybe send the next queuedRequests.pop_front(); sentRequests.push_back(r); - state = STATE_WAITING_FOR_RESPONSE; + state = STATE_WAITING_FOR_RESPONSE; - // pipelining, let's maybe send the next request right away + // pipelining, let's maybe send the next request right away tryStartNextRequest(); } @@ -325,12 +354,12 @@ public: { idleTime.stamp(); client->receivedBytes(static_cast(n)); - - if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) { - _contentDecoder.receivedBytes(s, n); - } else { - buffer += string(s, n); - } + + if( (state == STATE_GETTING_BODY) + || (state == STATE_GETTING_CHUNKED_BYTES) ) + _contentDecoder.receivedBytes(s, n); + else + buffer.append(s, n); } virtual void foundTerminator(void) @@ -427,7 +456,7 @@ private: void processHeader() { - string h = strutils::simplify(buffer); + std::string h = strutils::simplify(buffer); if (h.empty()) { // blank line terminates headers headersComplete(); return; @@ -439,9 +468,9 @@ private: return; } - string key = strutils::simplify(buffer.substr(0, colonPos)); - string lkey = boost::to_lower_copy(key); - string value = strutils::strip(buffer.substr(colonPos + 1)); + std::string key = strutils::simplify(buffer.substr(0, colonPos)); + std::string lkey = boost::to_lower_copy(key); + std::string value = strutils::strip(buffer.substr(colonPos + 1)); // only consider these if getting headers (as opposed to trailers // of a chunked transfer) @@ -465,7 +494,7 @@ private: activeRequest->responseHeader(lkey, value); } - void processTransferEncoding(const string& te) + void processTransferEncoding(const std::string& te) { if (te == "chunked") { chunkedTransfer = true; @@ -533,7 +562,7 @@ private: void responseComplete() { - Request_ptr completedRequest = activeRequest; + Request_ptr completedRequest = activeRequest; _contentDecoder.finish(); assert(sentRequests.front() == activeRequest); @@ -580,14 +609,13 @@ private: Client* client; Request_ptr activeRequest; ConnectionState state; - string host; + std::string host; short port; std::string buffer; int bodyTransferSize; SGTimeStamp idleTime; bool chunkedTransfer; bool noMessageBody; - int requestBodyBytesToSend; RequestList queuedRequests; RequestList sentRequests; @@ -603,6 +631,7 @@ Client::Client() : d->bytesTransferred = 0; d->lastTransferRate = 0; d->timeTransferSample.stamp(); + d->totalBytesDownloaded = 0; setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION)); } @@ -622,9 +651,13 @@ void Client::setMaxConnections(unsigned int maxCon) void Client::update(int waitTimeout) { - d->poller.poll(waitTimeout); - bool waitingRequests = !d->pendingRequests.empty(); + if (!d->poller.hasChannels() && (waitTimeout > 0)) { + SGTimeStamp::sleepForMSec(waitTimeout); + } else { + d->poller.poll(waitTimeout); + } + bool waitingRequests = !d->pendingRequests.empty(); ConnectionDict::iterator it = d->connections.begin(); for (; it != d->connections.end(); ) { Connection* con = it->second; @@ -667,10 +700,20 @@ void Client::update(int waitTimeout) void Client::makeRequest(const Request_ptr& r) { - if( r->url().find("://") == std::string::npos ) - throw std::runtime_error("Unable to parse URL (missing scheme)"); + if( r->isComplete() ) + return; + + if( r->url().find("://") == std::string::npos ) { + r->setFailure(EINVAL, "malformed URL"); + return; + } - string host = r->host(); + if( r->url().find("http://") != 0 ) { + r->setFailure(EINVAL, "only HTTP protocol is supported"); + return; + } + + std::string host = r->host(); int port = r->port(); if (!d->proxy.empty()) { host = d->proxy; @@ -678,9 +721,9 @@ void Client::makeRequest(const Request_ptr& r) } Connection* con = NULL; - stringstream ss; + std::stringstream ss; ss << host << "-" << port; - string connectionId = ss.str(); + std::string connectionId = ss.str(); bool havePending = !d->pendingRequests.empty(); bool atConnectionsLimit = d->connections.size() >= d->maxConnections; ConnectionDict::iterator consEnd = d->connections.end(); @@ -732,12 +775,29 @@ void Client::makeRequest(const Request_ptr& r) con->queueRequest(r); } +//------------------------------------------------------------------------------ +FileRequestRef Client::save( const std::string& url, + const std::string& filename ) +{ + FileRequestRef req = new FileRequest(url, filename); + makeRequest(req); + return req; +} + +//------------------------------------------------------------------------------ +MemoryRequestRef Client::load(const std::string& url) +{ + MemoryRequestRef req = new MemoryRequest(url); + makeRequest(req); + return req; +} + void Client::requestFinished(Connection* con) { } -void Client::setUserAgent(const string& ua) +void Client::setUserAgent(const std::string& ua) { d->userAgent = ua; } @@ -757,7 +817,9 @@ const std::string& Client::proxyAuth() const return d->proxyAuth; } -void Client::setProxy(const string& proxy, int port, const string& auth) +void Client::setProxy( const std::string& proxy, + int port, + const std::string& auth ) { d->proxy = proxy; d->proxyPort = port; @@ -777,22 +839,38 @@ bool Client::hasActiveRequests() const void Client::receivedBytes(unsigned int count) { d->bytesTransferred += count; + d->totalBytesDownloaded += count; } unsigned int Client::transferRateBytesPerSec() const { unsigned int e = d->timeTransferSample.elapsedMSec(); - if (e < 400) { - // if called too frequently, return cahced value, to smooth out - // < 1 sec changes in flow + if (e > 400) { + // too long a window, ignore + d->timeTransferSample.stamp(); + d->bytesTransferred = 0; + d->lastTransferRate = 0; + return 0; + } + + if (e < 100) { // avoid really narrow windows return d->lastTransferRate; } unsigned int ratio = (d->bytesTransferred * 1000) / e; + // run a low-pass filter + unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio); + smoothed /= 400; + d->timeTransferSample.stamp(); d->bytesTransferred = 0; - d->lastTransferRate = ratio; - return ratio; + d->lastTransferRate = smoothed; + return smoothed; +} + +uint64_t Client::totalBytesDownloaded() const +{ + return d->totalBytesDownloaded; } } // of namespace HTTP