X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=simgear%2Fio%2FHTTPClient.cxx;h=4d1f2c0e757292a8cb37e7ed81a8130882a9d65f;hb=3bcd0bafd5fba1eebadfd1cb8a7294d665cf1932;hp=71f03e372405a70bcab1a92c1f47f4b4e89b7255;hpb=b74d2ae9fa35026aa82987e89611fd341acda394;p=simgear.git diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index 71f03e37..4d1f2c0e 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -1,22 +1,47 @@ +/** + * \file HTTPClient.cxx - simple HTTP client engine for SimHear + */ + +// Written by James Turner +// +// Copyright (C) 2013 James Turner +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +// + #include "HTTPClient.hxx" +#include "HTTPFileRequest.hxx" #include #include +#include // rand() #include -#include #include +#include +#include #include #include -#include - #include -#include +#include #include #include #include #include +#include #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H #include "version.h" @@ -26,10 +51,6 @@ # endif #endif -using std::string; -using std::stringstream; -using std::vector; - namespace simgear { @@ -39,19 +60,31 @@ namespace HTTP extern const int DEFAULT_HTTP_PORT = 80; const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded"; const unsigned int MAX_INFLIGHT_REQUESTS = 32; -const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024; -const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS; - -// see http://www.ietf.org/rfc/rfc1952.txt for these values and -// detailed description of the logic -const int GZIP_HEADER_ID1 = 31; -const int GZIP_HEADER_ID2 = 139; -const int GZIP_HEADER_METHOD_DEFLATE = 8; -const int GZIP_HEADER_SIZE = 10; -const int GZIP_HEADER_FEXTRA = 1 << 2; -const int GZIP_HEADER_FNAME = 1 << 3; -const int GZIP_HEADER_COMMENT = 1 << 4; -const int GZIP_HEADER_CRC = 1 << 1; + +class Connection; +typedef std::multimap ConnectionDict; +typedef std::list RequestList; + +class Client::ClientPrivate +{ +public: + std::string userAgent; + std::string proxy; + int proxyPort; + std::string proxyAuth; + NetChannelPoller poller; + unsigned int maxConnections; + + RequestList pendingRequests; + +// connections by host (potentially more than one) + ConnectionDict connections; + + SGTimeStamp timeTransferSample; + unsigned int bytesTransferred; + unsigned int lastTransferRate; + uint64_t totalBytesDownloaded; +}; class Connection : public NetChat { @@ -59,26 +92,28 @@ public: Connection(Client* pr) : client(pr), state(STATE_CLOSED), - port(DEFAULT_HTTP_PORT), - zlibInflateBuffer(NULL), - zlibInflateBufferSize(0), - zlibOutputBuffer(NULL) + port(DEFAULT_HTTP_PORT) { - } virtual ~Connection() { - if (zlibInflateBuffer) { - free(zlibInflateBuffer); - } - - if (zlibOutputBuffer) { - free(zlibOutputBuffer); - } + } + + virtual void handleBufferRead (NetBuffer& buffer) + { + if( !activeRequest || !activeRequest->isComplete() ) + return NetChat::handleBufferRead(buffer); + + // Request should be aborted (signaled by setting its state to complete). + + // force the state to GETTING_BODY, to simplify logic in + // responseComplete and handleClose + state = STATE_GETTING_BODY; + responseComplete(); } - void setServer(const string& h, short p) + void setServer(const std::string& h, short p) { host = h; port = p; @@ -87,28 +122,30 @@ public: // socket-level errors virtual void handleError(int error) { - if (error == ENOENT) { - // name lookup failure - // we won't have an active request yet, so the logic below won't - // fire to actually call setFailure. Let's fail all of the requests + const char* errStr = strerror(error); + if (!activeRequest) + { + // connection level failure, eg name lookup or routing + // we won't have an active request yet, so let's fail all of the + // requests since we presume it's a systematic failure for + // the host in question BOOST_FOREACH(Request_ptr req, sentRequests) { - req->setFailure(error, "hostname lookup failure"); + req->setFailure(error, errStr); } BOOST_FOREACH(Request_ptr req, queuedRequests) { - req->setFailure(error, "hostname lookup failure"); + req->setFailure(error, errStr); } - // name lookup failure, abandon all requests on this connection sentRequests.clear(); queuedRequests.clear(); } NetChat::handleError(error); if (activeRequest) { - SG_LOG(SG_IO, SG_INFO, "HTTP socket error"); - activeRequest->setFailure(error, "socket error"); + activeRequest->setFailure(error, errStr); activeRequest = NULL; + _contentDecoder.reset(); } state = STATE_SOCKET_ERROR; @@ -117,13 +154,28 @@ public: virtual void handleClose() { NetChat::handleClose(); - - if ((state == STATE_GETTING_BODY) && activeRequest) { + + // closing of the connection from the server side when getting the body, + bool canCloseState = (state == STATE_GETTING_BODY); + if (canCloseState && activeRequest) { // force state here, so responseComplete can avoid closing the // socket again state = STATE_CLOSED; responseComplete(); } else { + if (activeRequest) { + activeRequest->setFailure(500, "server closed connection"); + // remove the failed request from sentRequests, so it does + // not get restored + RequestList::iterator it = std::find(sentRequests.begin(), + sentRequests.end(), activeRequest); + if (it != sentRequests.end()) { + sentRequests.erase(it); + } + activeRequest = NULL; + _contentDecoder.reset(); + } + state = STATE_CLOSED; } @@ -138,18 +190,37 @@ public: sentRequests.clear(); } + void handleTimeout() + { + NetChat::handleError(ETIMEDOUT); + if (activeRequest) { + SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout"); + activeRequest->setFailure(ETIMEDOUT, "socket timeout"); + activeRequest = NULL; + _contentDecoder.reset(); + } + + state = STATE_SOCKET_ERROR; + } + void queueRequest(const Request_ptr& r) { - queuedRequests.push_back(r); - tryStartNextRequest(); + queuedRequests.push_back(r); + tryStartNextRequest(); } void beginResponse() { - assert(!sentRequests.empty()); - - activeRequest = sentRequests.front(); - activeRequest->responseStart(buffer); + assert(!sentRequests.empty()); + assert(state == STATE_WAITING_FOR_RESPONSE); + + activeRequest = sentRequests.front(); + try { + activeRequest->responseStart(buffer); + } catch (sg_exception& e) { + handleError(EIO); + } + state = STATE_GETTING_HEADERS; buffer.clear(); if (activeRequest->responseCode() == 204) { @@ -162,11 +233,15 @@ public: bodyTransferSize = -1; chunkedTransfer = false; - contentGZip = contentDeflate = false; + _contentDecoder.reset(); } void tryStartNextRequest() { + while( !queuedRequests.empty() + && queuedRequests.front()->isComplete() ) + queuedRequests.pop_front(); + if (queuedRequests.empty()) { idleTime.stamp(); return; @@ -178,6 +253,7 @@ public: if (state == STATE_CLOSED) { if (!connectToHost()) { + return; } @@ -186,28 +262,29 @@ public: } Request_ptr r = queuedRequests.front(); - requestBodyBytesToSend = r->requestBodyLength(); - - stringstream headerData; - string path = r->path(); + r->requestStart(); + + std::stringstream headerData; + std::string path = r->path(); assert(!path.empty()); - string query = r->query(); - string bodyData; + std::string query = r->query(); + std::string bodyData; if (!client->proxyHost().empty()) { path = r->scheme() + "://" + r->host() + r->path(); } - if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) { + if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) { headerData << r->method() << " " << path << " HTTP/1.1\r\n"; bodyData = query.substr(1); // URL-encode, drop the leading '?' headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n"; headerData << "Content-Length:" << bodyData.size() << "\r\n"; } else { headerData << r->method() << " " << path << query << " HTTP/1.1\r\n"; - if (requestBodyBytesToSend >= 0) { - headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n"; - headerData << "Content-Type:" << r->requestBodyType() << "\r\n"; + if( r->hasBodyData() ) + { + headerData << "Content-Length:" << r->bodyLength() << "\r\n"; + headerData << "Content-Type:" << r->bodyType() << "\r\n"; } } @@ -218,8 +295,8 @@ public: headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n"; } - BOOST_FOREACH(string h, r->requestHeaders()) { - headerData << h << ": " << r->header(h) << "\r\n"; + BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) { + headerData << h.first << ": " << h.second << "\r\n"; } headerData << "\r\n"; // final CRLF to terminate the headers @@ -229,163 +306,67 @@ public: bool ok = push(headerData.str().c_str()); if (!ok) { + SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket"); // we've over-stuffed the socket, give up for now, let things // drain down before trying to start any more requests. return; } - - while (requestBodyBytesToSend > 0) { - char buf[4096]; - int len = 4096; - r->getBodyData(buf, len); - if (len > 0) { - requestBodyBytesToSend -= len; - if (!bufferSend(buf, len)) { - SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer"); - state = STATE_SOCKET_ERROR; - return; + + if( r->hasBodyData() ) + for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();) + { + char buf[4096]; + size_t len = r->getBodyData(buf, body_bytes_sent, 4096); + if( len ) + { + if( !bufferSend(buf, len) ) + { + SG_LOG(SG_IO, + SG_WARN, + "overflow the HTTP::Connection output buffer"); + state = STATE_SOCKET_ERROR; + return; + } + body_bytes_sent += len; + } + else + { + SG_LOG(SG_IO, + SG_WARN, + "HTTP asynchronous request body generation is unsupported"); + break; } - } else { - SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported"); - break; } - } - //std::cout << "did send request:" << r->url() << std::endl; - // successfully sent, remove from queue, and maybe send the next + // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() << + // "\n\t @ " << reinterpret_cast(r.ptr()) << + // "\n\t on connection " << this); + // successfully sent, remove from queue, and maybe send the next queuedRequests.pop_front(); sentRequests.push_back(r); - - // pipelining, let's maybe send the next request right away + state = STATE_WAITING_FOR_RESPONSE; + + // pipelining, let's maybe send the next request right away tryStartNextRequest(); } virtual void collectIncomingData(const char* s, int n) { idleTime.stamp(); - if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) { - if (contentGZip || contentDeflate) { - expandCompressedData(s, n); - } else { - activeRequest->processBodyBytes(s, n); - } - } else { - buffer += string(s, n); - } - } + client->receivedBytes(static_cast(n)); - - void expandCompressedData(const char* s, int n) - { - int reqSize = n + zlib.avail_in; - if (reqSize > zlibInflateBufferSize) { - // reallocate - unsigned char* newBuf = (unsigned char*) malloc(reqSize); - memcpy(newBuf, zlib.next_in, zlib.avail_in); - memcpy(newBuf + zlib.avail_in, s, n); - free(zlibInflateBuffer); - zlibInflateBuffer = newBuf; - zlibInflateBufferSize = reqSize; - } else { - // important to use memmove here, since it's very likely - // the source and destination ranges overlap - memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in); - memcpy(zlibInflateBuffer + zlib.avail_in, s, n); - } - - zlib.next_in = (unsigned char*) zlibInflateBuffer; - zlib.avail_in = reqSize; - zlib.next_out = zlibOutputBuffer; - zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; - - if (contentGZip) { - // we clear this down to contentDeflate once the GZip header has been seen - if (reqSize < GZIP_HEADER_SIZE) { - return; // need more header bytes - } - - if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) || - (zlibInflateBuffer[1] != GZIP_HEADER_ID2) || - (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE)) - { - return; // invalid GZip header - } - - char flags = zlibInflateBuffer[3]; - int gzipHeaderSize = GZIP_HEADER_SIZE; - if (flags & GZIP_HEADER_FEXTRA) { - gzipHeaderSize += 2; - if (reqSize < gzipHeaderSize) { - return; // need more header bytes - } - - unsigned short extraHeaderBytes = *(reinterpret_cast(zlibInflateBuffer + GZIP_HEADER_FEXTRA)); - if ( sgIsBigEndian() ) { - sgEndianSwap( &extraHeaderBytes ); - } - - gzipHeaderSize += extraHeaderBytes; - if (reqSize < gzipHeaderSize) { - return; // need more header bytes - } - } - - if (flags & GZIP_HEADER_FNAME) { - gzipHeaderSize++; - while (gzipHeaderSize <= reqSize) { - if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { - break; // found terminating NULL character - } - } - } - - if (flags & GZIP_HEADER_COMMENT) { - gzipHeaderSize++; - while (gzipHeaderSize <= reqSize) { - if (zlibInflateBuffer[gzipHeaderSize-1] == 0) { - break; // found terminating NULL character - } - } - } - - if (flags & GZIP_HEADER_CRC) { - gzipHeaderSize += 2; - } - - if (reqSize < gzipHeaderSize) { - return; // need more header bytes - } - - zlib.next_in += gzipHeaderSize; - zlib.avail_in = reqSize - gzipHeaderSize; - // now we've processed the GZip header, can decode as deflate - contentGZip = false; - contentDeflate = true; - } - - int writtenSize = 0; - do { - int result = inflate(&zlib, Z_NO_FLUSH); - if (result == Z_OK || result == Z_STREAM_END) { - - } else { - SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result); - return; - } - - writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out; - } while ((writtenSize == 0) && (zlib.avail_in > 0)); - - if (writtenSize > 0) { - activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize); - } + if( (state == STATE_GETTING_BODY) + || (state == STATE_GETTING_CHUNKED_BYTES) ) + _contentDecoder.receivedBytes(s, n); + else + buffer.append(s, n); } - + virtual void foundTerminator(void) { idleTime.stamp(); switch (state) { - case STATE_IDLE: + case STATE_WAITING_FOR_RESPONSE: beginResponse(); break; @@ -405,6 +386,7 @@ public: case STATE_GETTING_CHUNKED_BYTES: setTerminator("\r\n"); state = STATE_GETTING_CHUNKED; + buffer.clear(); break; @@ -413,6 +395,9 @@ public: buffer.clear(); break; + case STATE_IDLE: + SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?"); + default: break; } @@ -424,6 +409,7 @@ public: return false; } + assert(sentRequests.empty()); return idleTime.elapsedMSec() > 1000 * 10; // ten seconds } @@ -445,6 +431,11 @@ public: { return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS); } + + bool isActive() const + { + return !queuedRequests.empty() || !sentRequests.empty(); + } private: bool connectToHost() { @@ -465,37 +456,9 @@ private: void processHeader() { - string h = strutils::simplify(buffer); + std::string h = strutils::simplify(buffer); if (h.empty()) { // blank line terminates headers headersComplete(); - - if (contentGZip || contentDeflate) { - memset(&zlib, 0, sizeof(z_stream)); - if (!zlibOutputBuffer) { - zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE); - } - - // NULLs means we'll get default alloc+free methods - // which is absolutely fine - zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE; - zlib.next_out = zlibOutputBuffer; - if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) { - SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed"); - } - } - - if (chunkedTransfer) { - state = STATE_GETTING_CHUNKED; - } else if (noMessageBody || (bodyTransferSize == 0)) { - // force the state to GETTING_BODY, to simplify logic in - // responseComplete and handleClose - state = STATE_GETTING_BODY; - responseComplete(); - } else { - setByteCount(bodyTransferSize); // may be -1, that's fine - state = STATE_GETTING_BODY; - } - return; } @@ -505,9 +468,9 @@ private: return; } - string key = strutils::simplify(buffer.substr(0, colonPos)); - string lkey = boost::to_lower_copy(key); - string value = strutils::strip(buffer.substr(colonPos + 1)); + std::string key = strutils::simplify(buffer.substr(0, colonPos)); + std::string lkey = boost::to_lower_copy(key); + std::string value = strutils::strip(buffer.substr(colonPos + 1)); // only consider these if getting headers (as opposed to trailers // of a chunked transfer) @@ -524,20 +487,14 @@ private: } else if (lkey == "transfer-encoding") { processTransferEncoding(value); } else if (lkey == "content-encoding") { - if (value == "gzip") { - contentGZip = true; - } else if (value == "deflate") { - contentDeflate = true; - } else if (value != "identity") { - SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value); - } + _contentDecoder.setEncoding(value); } } activeRequest->responseHeader(lkey, value); } - void processTransferEncoding(const string& te) + void processTransferEncoding(const std::string& te) { if (te == "chunked") { chunkedTransfer = true; @@ -553,7 +510,7 @@ private: // blank line after chunk data return; } - + int chunkSize = 0; int semiPos = buffer.find(';'); if (semiPos >= 0) { @@ -588,17 +545,25 @@ private: void headersComplete() { activeRequest->responseHeadersComplete(); + _contentDecoder.initWithRequest(activeRequest); + + if (chunkedTransfer) { + state = STATE_GETTING_CHUNKED; + } else if (noMessageBody || (bodyTransferSize == 0)) { + // force the state to GETTING_BODY, to simplify logic in + // responseComplete and handleClose + state = STATE_GETTING_BODY; + responseComplete(); + } else { + setByteCount(bodyTransferSize); // may be -1, that's fine + state = STATE_GETTING_BODY; + } } void responseComplete() { - //std::cout << "responseComplete:" << activeRequest->url() << std::endl; - activeRequest->responseComplete(); - client->requestFinished(this); - - if (contentDeflate) { - inflateEnd(&zlib); - } + Request_ptr completedRequest = activeRequest; + _contentDecoder.finish(); assert(sentRequests.front() == activeRequest); sentRequests.pop_front(); @@ -609,21 +574,29 @@ private: if (doClose) { // this will bring us into handleClose() above, which updates // state to STATE_CLOSED - close(); + close(); // if we have additional requests waiting, try to start them now - tryStartNextRequest(); - } + tryStartNextRequest(); + } } - if (state != STATE_CLOSED) { - state = STATE_IDLE; - } - setTerminator("\r\n"); + if (state != STATE_CLOSED) { + state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE; + } + + // notify request after we change state, so this connection is idle + // if completion triggers other requests (which is likely) + // SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url()); + completedRequest->responseComplete(); + client->requestFinished(this); + + setTerminator("\r\n"); } enum ConnectionState { STATE_IDLE = 0, + STATE_WAITING_FOR_RESPONSE, STATE_GETTING_HEADERS, STATE_GETTING_BODY, STATE_GETTING_CHUNKED, @@ -636,74 +609,187 @@ private: Client* client; Request_ptr activeRequest; ConnectionState state; - string host; + std::string host; short port; std::string buffer; int bodyTransferSize; SGTimeStamp idleTime; bool chunkedTransfer; bool noMessageBody; - int requestBodyBytesToSend; - - z_stream zlib; - unsigned char* zlibInflateBuffer; - int zlibInflateBufferSize; - unsigned char* zlibOutputBuffer; - bool contentGZip, contentDeflate; - - std::list queuedRequests; - std::list sentRequests; + + RequestList queuedRequests; + RequestList sentRequests; + + ContentDecoder _contentDecoder; }; -Client::Client() +Client::Client() : + d(new ClientPrivate) { + d->proxyPort = 0; + d->maxConnections = 4; + d->bytesTransferred = 0; + d->lastTransferRate = 0; + d->timeTransferSample.stamp(); + d->totalBytesDownloaded = 0; + setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION)); } +Client::~Client() +{ +} + +void Client::setMaxConnections(unsigned int maxCon) +{ + if (maxCon < 1) { + throw sg_range_exception("illegal HTTP::Client::setMaxConnections value"); + } + + d->maxConnections = maxCon; +} + void Client::update(int waitTimeout) { - NetChannel::poll(waitTimeout); - - ConnectionDict::iterator it = _connections.begin(); - for (; it != _connections.end(); ) { - if (it->second->hasIdleTimeout() || it->second->hasError() || - it->second->hasErrorTimeout()) + if (!d->poller.hasChannels() && (waitTimeout > 0)) { + SGTimeStamp::sleepForMSec(waitTimeout); + } else { + d->poller.poll(waitTimeout); + } + + bool waitingRequests = !d->pendingRequests.empty(); + ConnectionDict::iterator it = d->connections.begin(); + for (; it != d->connections.end(); ) { + Connection* con = it->second; + if (con->hasIdleTimeout() || + con->hasError() || + con->hasErrorTimeout() || + (!con->isActive() && waitingRequests)) { + if (con->hasErrorTimeout()) { + // tell the connection we're timing it out + con->handleTimeout(); + } + // connection has been idle for a while, clean it up - // (or has an error condition, again clean it up) + // (or if we have requests waiting for a different host, + // or an error condition ConnectionDict::iterator del = it++; delete del->second; - _connections.erase(del); + d->connections.erase(del); } else { if (it->second->shouldStartNext()) { it->second->tryStartNextRequest(); } - ++it; } - } // of connecion iteration + } // of connection iteration + + if (waitingRequests && (d->connections.size() < d->maxConnections)) { + RequestList waiting(d->pendingRequests); + d->pendingRequests.clear(); + + // re-submit all waiting requests in order; this takes care of + // finding multiple pending items targetted to the same (new) + // connection + BOOST_FOREACH(Request_ptr req, waiting) { + makeRequest(req); + } + } } void Client::makeRequest(const Request_ptr& r) { - string host = r->host(); + if( r->isComplete() ) + return; + + if( r->url().find("://") == std::string::npos ) { + r->setFailure(EINVAL, "malformed URL"); + return; + } + + if( r->url().find("http://") != 0 ) { + r->setFailure(EINVAL, "only HTTP protocol is supported"); + return; + } + + std::string host = r->host(); int port = r->port(); - if (!_proxy.empty()) { - host = _proxy; - port = _proxyPort; + if (!d->proxy.empty()) { + host = d->proxy; + port = d->proxyPort; } - stringstream ss; + Connection* con = NULL; + std::stringstream ss; ss << host << "-" << port; - string connectionId = ss.str(); + std::string connectionId = ss.str(); + bool havePending = !d->pendingRequests.empty(); + bool atConnectionsLimit = d->connections.size() >= d->maxConnections; + ConnectionDict::iterator consEnd = d->connections.end(); + + // assign request to an existing Connection. + // various options exist here, examined in order + ConnectionDict::iterator it = d->connections.find(connectionId); + if (atConnectionsLimit && (it == consEnd)) { + // maximum number of connections active, queue this request + // when a connection goes inactive, we'll start this one + d->pendingRequests.push_back(r); + return; + } + + // scan for an idle Connection to the same host (likely if we're + // retrieving multiple resources from the same host in quick succession) + // if we have pending requests (waiting for a free Connection), then + // force new requests on this id to always use the first Connection + // (instead of the random selection below). This ensures that when + // there's pressure on the number of connections to keep alive, one + // host can't DoS every other. + int count = 0; + for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) { + if (havePending || !it->second->isActive()) { + con = it->second; + break; + } + } + + if (!con && atConnectionsLimit) { + // all current connections are busy (active), and we don't + // have free connections to allocate, so let's assign to + // an existing one randomly. Ideally we'd used whichever one will + // complete first but we don't have that info. + int index = rand() % count; + for (it = d->connections.find(connectionId); index > 0; --index) { ; } + con = it->second; + } - if (_connections.find(connectionId) == _connections.end()) { - Connection* con = new Connection(this); + // allocate a new connection object + if (!con) { + con = new Connection(this); con->setServer(host, port); - _connections[connectionId] = con; + d->poller.addChannel(con); + d->connections.insert(d->connections.end(), + ConnectionDict::value_type(connectionId, con)); } - _connections[connectionId]->queueRequest(r); + con->queueRequest(r); +} + +//------------------------------------------------------------------------------ +FileRequestRef Client::save( const std::string& url, + const std::string& filename ) +{ + FileRequestRef req = new FileRequest(url, filename); + makeRequest(req); + return req; +} + +//------------------------------------------------------------------------------ +MemoryRequestRef Client::load(const std::string& url) +{ + MemoryRequestRef req = new MemoryRequest(url); + makeRequest(req); + return req; } void Client::requestFinished(Connection* con) @@ -711,16 +797,80 @@ void Client::requestFinished(Connection* con) } -void Client::setUserAgent(const string& ua) +void Client::setUserAgent(const std::string& ua) +{ + d->userAgent = ua; +} + +const std::string& Client::userAgent() const { - _userAgent = ua; + return d->userAgent; +} + +const std::string& Client::proxyHost() const +{ + return d->proxy; +} + +const std::string& Client::proxyAuth() const +{ + return d->proxyAuth; +} + +void Client::setProxy( const std::string& proxy, + int port, + const std::string& auth ) +{ + d->proxy = proxy; + d->proxyPort = port; + d->proxyAuth = auth; +} + +bool Client::hasActiveRequests() const +{ + ConnectionDict::const_iterator it = d->connections.begin(); + for (; it != d->connections.end(); ++it) { + if (it->second->isActive()) return true; + } + + return false; +} + +void Client::receivedBytes(unsigned int count) +{ + d->bytesTransferred += count; + d->totalBytesDownloaded += count; +} + +unsigned int Client::transferRateBytesPerSec() const +{ + unsigned int e = d->timeTransferSample.elapsedMSec(); + if (e > 400) { + // too long a window, ignore + d->timeTransferSample.stamp(); + d->bytesTransferred = 0; + d->lastTransferRate = 0; + return 0; + } + + if (e < 100) { // avoid really narrow windows + return d->lastTransferRate; + } + + unsigned int ratio = (d->bytesTransferred * 1000) / e; + // run a low-pass filter + unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio); + smoothed /= 400; + + d->timeTransferSample.stamp(); + d->bytesTransferred = 0; + d->lastTransferRate = smoothed; + return smoothed; } -void Client::setProxy(const string& proxy, int port, const string& auth) +uint64_t Client::totalBytesDownloaded() const { - _proxy = proxy; - _proxyPort = port; - _proxyAuth = auth; + return d->totalBytesDownloaded; } } // of namespace HTTP