]> git.mxchange.org Git - simgear.git/blobdiff - simgear/io/HTTPClient.cxx
HTTP: tweak for malformed header handling.
[simgear.git] / simgear / io / HTTPClient.cxx
index ad06756f5e382baee75583a8cc4a6bf8ecfd7fb5..ecc63f844054154a5a99230658848f2d46da05fb 100644 (file)
@@ -1,33 +1,56 @@
+/**
+ * \file HTTPClient.cxx - simple HTTP client engine for SimHear
+ */
+
+// Written by James Turner
+//
+// Copyright (C) 2013  James Turner  <zakalawe@mac.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program; if not, write to the Free Software
+// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
+//
+
 #include "HTTPClient.hxx"
+#include "HTTPFileRequest.hxx"
 
 #include <sstream>
 #include <cassert>
+#include <cstdlib> // rand()
 #include <list>
+#include <errno.h>
+#include <map>
+#include <stdexcept>
 
 #include <boost/foreach.hpp>
 #include <boost/algorithm/string/case_conv.hpp>
 
 #include <simgear/io/sg_netChat.hxx>
+#include <simgear/io/HTTPContentDecode.hxx>
 #include <simgear/misc/strutils.hxx>
 #include <simgear/compiler.h>
 #include <simgear/debug/logstream.hxx>
 #include <simgear/timing/timestamp.hxx>
+#include <simgear/structure/exception.hxx>
 
 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
 #include "version.h"
 #else
 #  if !defined(SIMGEAR_VERSION)
-#    define SIMGEAR_VERSION development
+#    define SIMGEAR_VERSION "simgear-development"
 #  endif
 #endif
 
-using std::string;
-using std::stringstream;
-using std::vector;
-
-//#include <iostream>
-//using namespace std;
-
 namespace simgear
 {
 
@@ -35,7 +58,40 @@ namespace HTTP
 {
 
 extern const int DEFAULT_HTTP_PORT = 80;
+const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
+const unsigned int MAX_INFLIGHT_REQUESTS = 32;
+
+class Connection;
+typedef std::multimap<std::string, Connection*> ConnectionDict;
+typedef std::list<Request_ptr> RequestList;
 
+static bool isFailureStatus(int httpStatus)
+{
+  int majorCode = httpStatus / 100;
+  return (majorCode != 2);
+}
+
+class Client::ClientPrivate
+{
+public:
+    std::string userAgent;
+    std::string proxy;
+    int proxyPort;
+    std::string proxyAuth;
+    NetChannelPoller poller;
+    unsigned int maxConnections;
+    
+    RequestList pendingRequests;
+    
+// connections by host (potentially more than one)
+    ConnectionDict connections;
+    
+    SGTimeStamp timeTransferSample;
+    unsigned int bytesTransferred;
+    unsigned int lastTransferRate;
+    uint64_t totalBytesDownloaded;
+};
+  
 class Connection : public NetChat
 {
 public:
@@ -44,10 +100,26 @@ public:
         state(STATE_CLOSED),
         port(DEFAULT_HTTP_PORT)
     {
-        
     }
     
-    void setServer(const string& h, short p)
+    virtual ~Connection()
+    {
+    }
+
+    virtual void handleBufferRead (NetBuffer& buffer)
+    {
+      if( !activeRequest || !activeRequest->isComplete() )
+        return NetChat::handleBufferRead(buffer);
+
+      // Request should be aborted (signaled by setting its state to complete).
+
+      // force the state to GETTING_BODY, to simplify logic in
+      // responseComplete and handleClose
+      state = STATE_GETTING_BODY;
+      responseComplete();
+    }
+  
+    void setServer(const std::string& h, short p)
     {
         host = h;
         port = p;
@@ -55,101 +127,251 @@ public:
     
     // socket-level errors
     virtual void handleError(int error)
-    {        
+    {
+        if (error == ENOENT) {
+        // name lookup failure
+            // we won't have an active request yet, so the logic below won't
+            // fire to actually call setFailure. Let's fail all of the requests
+            BOOST_FOREACH(Request_ptr req, sentRequests) {
+                req->setFailure(error, "hostname lookup failure");
+            }
+            
+            BOOST_FOREACH(Request_ptr req, queuedRequests) {
+                req->setFailure(error, "hostname lookup failure");
+            }
+            
+        // name lookup failure, abandon all requests on this connection
+            sentRequests.clear();
+            queuedRequests.clear();
+        }
+        
         NetChat::handleError(error);
-        if (activeRequest) {
+        if (activeRequest) {            
             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
             activeRequest->setFailure(error, "socket error");
             activeRequest = NULL;
+            _contentDecoder.reset();
         }
     
         state = STATE_SOCKET_ERROR;
     }
     
     virtual void handleClose()
-    {
+    {      
         NetChat::handleClose();
-        
-        if ((state == STATE_GETTING_BODY) && activeRequest) {
+
+    // closing of the connection from the server side when getting the body,
+        bool canCloseState = (state == STATE_GETTING_BODY);
+        if (canCloseState && activeRequest) {
         // force state here, so responseComplete can avoid closing the 
         // socket again
             state =  STATE_CLOSED;
             responseComplete();
         } else {
+            if (activeRequest) {
+                activeRequest->setFailure(500, "server closed connection");
+                // remove the failed request from sentRequests, so it does 
+                // not get restored
+                RequestList::iterator it = std::find(sentRequests.begin(), 
+                    sentRequests.end(), activeRequest);
+                if (it != sentRequests.end()) {
+                    sentRequests.erase(it);
+                }
+                activeRequest = NULL;
+                _contentDecoder.reset();
+            }
+            
             state = STATE_CLOSED;
         }
+      
+      if (sentRequests.empty()) {
+        return;
+      }
+      
+    // restore sent requests to the queue, so they will be re-sent
+    // when the connection opens again
+      queuedRequests.insert(queuedRequests.begin(),
+                              sentRequests.begin(), sentRequests.end());
+      sentRequests.clear();
     }
     
-    void queueRequest(const Request_ptr& r)
+    void handleTimeout()
     {
-        if (!activeRequest) {
-            startRequest(r);
-        } else {
-            queuedRequests.push_back(r);
+        NetChat::handleError(ETIMEDOUT);
+        if (activeRequest) {
+            SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
+            activeRequest->setFailure(ETIMEDOUT, "socket timeout");
+            activeRequest = NULL;
+            _contentDecoder.reset();
         }
+        
+        state = STATE_SOCKET_ERROR;
     }
     
-    void startRequest(const Request_ptr& r)
+    void queueRequest(const Request_ptr& r)
     {
-        if (state == STATE_CLOSED) {
-            if (!connectToHost()) {
-                return;
-            }
-            
-            state = STATE_IDLE;
-        }
-                
-        activeRequest = r;
-        state = STATE_IDLE;
-        bodyTransferSize = -1;
-        chunkedTransfer = false;
-        setTerminator("\r\n");
+        queuedRequests.push_back(r);
+        tryStartNextRequest();
+    }
+    
+    void beginResponse()
+    {
+        assert(!sentRequests.empty());
+        assert(state == STATE_WAITING_FOR_RESPONSE);
         
-        stringstream headerData;
-        string path = r->path();
-        if (!client->proxyHost().empty()) {
-            path = r->url();
-        }
+        activeRequest = sentRequests.front();
+       activeRequest->responseStart(buffer);
+       if (isFailureStatus(activeRequest->responseCode())) {
+         handleError(EIO);
+         return;
+       }
+      
+      state = STATE_GETTING_HEADERS;
+      buffer.clear();
+      if (activeRequest->responseCode() == 204) {
+        noMessageBody = true;
+      } else if (activeRequest->method() == "HEAD") {
+        noMessageBody = true;
+      } else {
+        noMessageBody = false;
+      }
 
-        headerData << r->method() << " " << path << " HTTP/1.1\r\n";
-        headerData << "Host: " << r->hostAndPort() << "\r\n";
-        headerData << "User-Agent:" << client->userAgent() << "\r\n";
-        if (!client->proxyAuth().empty()) {
-            headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
-        }
+      bodyTransferSize = -1;
+      chunkedTransfer = false;
+      _contentDecoder.reset();
+    }
+  
+    void tryStartNextRequest()
+    {
+      while( !queuedRequests.empty()
+          && queuedRequests.front()->isComplete() )
+        queuedRequests.pop_front();
 
-        BOOST_FOREACH(string h, r->requestHeaders()) {
-            headerData << h << ": " << r->header(h) << "\r\n";
-        }
+      if (queuedRequests.empty()) {
+        idleTime.stamp();
+        return;
+      }
+      
+      if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
+        return;
+      }
+      
+      if (state == STATE_CLOSED) {
+          if (!connectToHost()) {
+              return;
+          }
+          
+          setTerminator("\r\n");
+          state = STATE_IDLE;
+      }
+     
+      Request_ptr r = queuedRequests.front();
+      r->requestStart();
 
-        headerData << "\r\n"; // final CRLF to terminate the headers
+      std::stringstream headerData;
+      std::string path = r->path();
+      assert(!path.empty());
+      std::string query = r->query();
+      std::string bodyData;
+      
+      if (!client->proxyHost().empty()) {
+          path = r->scheme() + "://" + r->host() + r->path();
+      }
 
-    // TODO - add request body support for PUT, etc operations
+      if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
+          headerData << r->method() << " " << path << " HTTP/1.1\r\n";
+          bodyData = query.substr(1); // URL-encode, drop the leading '?'
+          headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
+          headerData << "Content-Length:" << bodyData.size() << "\r\n";
+      } else {
+          headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
+          if( r->hasBodyData() )
+          {
+            headerData << "Content-Length:" << r->bodyLength() << "\r\n";
+            headerData << "Content-Type:" << r->bodyType() << "\r\n";
+          }
+      }
+      
+      headerData << "Host: " << r->hostAndPort() << "\r\n";
+      headerData << "User-Agent:" << client->userAgent() << "\r\n";
+      headerData << "Accept-Encoding: deflate, gzip\r\n";
+      if (!client->proxyAuth().empty()) {
+          headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
+      }
 
-        bool ok = push(headerData.str().c_str());
-        if (!ok) {
-            SG_LOG(SG_IO, SG_WARN, "HTTP writing to socket failed");
-            state = STATE_SOCKET_ERROR;
-            return;
-        }        
+      BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
+          headerData << h.first << ": " << h.second << "\r\n";
+      }
+
+      headerData << "\r\n"; // final CRLF to terminate the headers
+      if (!bodyData.empty()) {
+          headerData << bodyData;
+      }
+      
+      bool ok = push(headerData.str().c_str());
+      if (!ok) {
+          SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
+          // we've over-stuffed the socket, give up for now, let things
+          // drain down before trying to start any more requests.
+          return;
+      }
+
+      if( r->hasBodyData() )
+        for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
+        {
+          char buf[4096];
+          size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
+          if( len )
+          {
+            if( !bufferSend(buf, len) )
+            {
+              SG_LOG(SG_IO,
+                     SG_WARN,
+                     "overflow the HTTP::Connection output buffer");
+              state = STATE_SOCKET_ERROR;
+              return;
+            }
+            body_bytes_sent += len;
+          }
+          else
+          {
+            SG_LOG(SG_IO,
+                   SG_WARN,
+                   "HTTP asynchronous request body generation is unsupported");
+            break;
+          }
+        }
+      
+      //   SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
+      //       "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
+      //      "\n\t on connection " << this);
+      // successfully sent, remove from queue, and maybe send the next
+      queuedRequests.pop_front();
+      sentRequests.push_back(r);
+      state = STATE_WAITING_FOR_RESPONSE;
+        
+      // pipelining, let's maybe send the next request right away
+      tryStartNextRequest();
     }
     
     virtual void collectIncomingData(const char* s, int n)
     {
-        if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
-            activeRequest->processBodyBytes(s, n);
-        } else {
-            buffer += string(s, n);
-        }
+        idleTime.stamp();
+        client->receivedBytes(static_cast<unsigned int>(n));
+
+        if(   (state == STATE_GETTING_BODY)
+           || (state == STATE_GETTING_CHUNKED_BYTES) )
+          _contentDecoder.receivedBytes(s, n);
+        else
+          buffer.append(s, n);
     }
-    
+
     virtual void foundTerminator(void)
     {
+        idleTime.stamp();
         switch (state) {
-        case STATE_IDLE:
-            activeRequest->responseStart(buffer);
-            state = STATE_GETTING_HEADERS;
-            buffer.clear();
+        case STATE_WAITING_FOR_RESPONSE:
+            beginResponse();
             break;
             
         case STATE_GETTING_HEADERS:
@@ -168,13 +390,18 @@ public:
         case STATE_GETTING_CHUNKED_BYTES:
             setTerminator("\r\n");
             state = STATE_GETTING_CHUNKED;
+            buffer.clear();
             break;
             
+
         case STATE_GETTING_TRAILER:
             processTrailer();
             buffer.clear();
             break;
         
+        case STATE_IDLE:
+            SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
+                
         default:
             break;
         }
@@ -186,8 +413,18 @@ public:
             return false;
         }
         
+        assert(sentRequests.empty());
         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
     }
+  
+    bool hasErrorTimeout() const
+    {
+      if (state == STATE_IDLE) {
+        return false;
+      }
+      
+      return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
+    }
     
     bool hasError() const
     {
@@ -196,20 +433,17 @@ public:
     
     bool shouldStartNext() const
     {
-        return !activeRequest && !queuedRequests.empty() && 
-            ((state == STATE_CLOSED) || (state == STATE_IDLE));
+      return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
     }
     
-    void startNext()
+    bool isActive() const
     {
-        Request_ptr next = queuedRequests.front();
-        queuedRequests.pop_front();
-        startRequest(next);
+        return !queuedRequests.empty() || !sentRequests.empty();
     }
 private:
     bool connectToHost()
     {
-        SG_LOG(SG_IO, SG_INFO, "HTTP connecting to " << host << ":" << port);
+        SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
         
         if (!open()) {
             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
@@ -226,34 +460,27 @@ private:
     
     void processHeader()
     {
-        string h = strutils::simplify(buffer);
+        std::string h = strutils::simplify(buffer);
         if (h.empty()) { // blank line terminates headers
             headersComplete();
-            
-            if (chunkedTransfer) {
-                state = STATE_GETTING_CHUNKED;
-            } else {
-                setByteCount(bodyTransferSize); // may be -1, that's fine
-                state = STATE_GETTING_BODY;
-            }
-            
             return;
         }
-        
+              
         int colonPos = buffer.find(':');
         if (colonPos < 0) {
             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
             return;
         }
         
-        string key = strutils::simplify(buffer.substr(0, colonPos));
-        string lkey = boost::to_lower_copy(key);
-        string value = strutils::strip(buffer.substr(colonPos + 1));
+        std::string key = strutils::simplify(buffer.substr(0, colonPos));
+        std::string lkey = boost::to_lower_copy(key);
+        std::string value = strutils::strip(buffer.substr(colonPos + 1));
         
         // only consider these if getting headers (as opposed to trailers 
         // of a chunked transfer)
         if (state == STATE_GETTING_HEADERS) {
             if (lkey == "content-length") {
+
                 int sz = strutils::to_int(value);
                 if (bodyTransferSize <= 0) {
                     bodyTransferSize = sz;
@@ -263,13 +490,15 @@ private:
                 bodyTransferSize = strutils::to_int(value);
             } else if (lkey == "transfer-encoding") {
                 processTransferEncoding(value);
+            } else if (lkey == "content-encoding") {
+                _contentDecoder.setEncoding(value);
             }
         }
     
         activeRequest->responseHeader(lkey, value);
     }
     
-    void processTransferEncoding(const string& te)
+    void processTransferEncoding(const std::string& te)
     {
         if (te == "chunked") {
             chunkedTransfer = true;
@@ -285,7 +514,7 @@ private:
             // blank line after chunk data
             return;
         }
-        
+                
         int chunkSize = 0;
         int semiPos = buffer.find(';');
         if (semiPos >= 0) {
@@ -320,40 +549,58 @@ private:
     void headersComplete()
     {
         activeRequest->responseHeadersComplete();
+        _contentDecoder.initWithRequest(activeRequest);
+      
+        if (chunkedTransfer) {
+            state = STATE_GETTING_CHUNKED;
+        } else if (noMessageBody || (bodyTransferSize == 0)) {
+            // force the state to GETTING_BODY, to simplify logic in
+            // responseComplete and handleClose
+            state = STATE_GETTING_BODY;
+            responseComplete();
+        } else {
+            setByteCount(bodyTransferSize); // may be -1, that's fine
+            state = STATE_GETTING_BODY;
+        }
     }
     
     void responseComplete()
     {
-        activeRequest->responseComplete();
-        client->requestFinished(this);
-        //cout << "response complete: " << activeRequest->url() << endl;
-        
+        Request_ptr completedRequest = activeRequest;
+        _contentDecoder.finish();
+      
+        assert(sentRequests.front() == activeRequest);
+        sentRequests.pop_front();
         bool doClose = activeRequest->closeAfterComplete();
         activeRequest = NULL;
-        if (state == STATE_GETTING_BODY) {
-            state = STATE_IDLE;
+      
+        if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
             if (doClose) {
-            // this will bring us into handleClose() above, which updates
-            // state to STATE_CLOSED
-                close();
+          // this will bring us into handleClose() above, which updates
+          // state to STATE_CLOSED
+              close();
+              
+          // if we have additional requests waiting, try to start them now
+              tryStartNextRequest();
             }
         }
         
-        setTerminator("\r\n");
-        
-    // if we have more requests, and we're idle, can start the next
-    // request immediately. Note we cannot do this if we're in STATE_CLOSED,
-    // since NetChannel::close cleans up state after calling handleClose;
-    // instead we pick up waiting requests in update()
-        if (!queuedRequests.empty() && (state == STATE_IDLE)) {
-            startNext();
-        } else {
-            idleTime.stamp();
+        if (state != STATE_CLOSED)  {
+            state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
         }
+        
+    // notify request after we change state, so this connection is idle
+    // if completion triggers other requests (which is likely)
+        //   SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
+        completedRequest->responseComplete();
+        client->requestFinished(this);
+        
+        setTerminator("\r\n");
     }
     
     enum ConnectionState {
         STATE_IDLE = 0,
+        STATE_WAITING_FOR_RESPONSE,
         STATE_GETTING_HEADERS,
         STATE_GETTING_BODY,
         STATE_GETTING_CHUNKED,
@@ -366,64 +613,187 @@ private:
     Client* client;
     Request_ptr activeRequest;
     ConnectionState state;
-    string host;
+    std::string host;
     short port;
     std::string buffer;
     int bodyTransferSize;
     SGTimeStamp idleTime;
     bool chunkedTransfer;
+    bool noMessageBody;
     
-    std::list<Request_ptr> queuedRequests;
+    RequestList queuedRequests;
+    RequestList sentRequests;
+    
+    ContentDecoder _contentDecoder;
 };
 
-Client::Client()
+Client::Client() :
+    d(new ClientPrivate)
 {
+    d->proxyPort = 0;
+    d->maxConnections = 4;
+    d->bytesTransferred = 0;
+    d->lastTransferRate = 0;
+    d->timeTransferSample.stamp();
+    d->totalBytesDownloaded = 0;
+    
     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
 }
 
-void Client::update()
+Client::~Client()
 {
-    NetChannel::poll();
-        
-    ConnectionDict::iterator it = _connections.begin();
-    for (; it != _connections.end(); ) {
-        if (it->second->hasIdleTimeout() || it->second->hasError()) {
+}
+
+void Client::setMaxConnections(unsigned int maxCon)
+{
+    if (maxCon < 1) {
+        throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
+    }
+    
+    d->maxConnections = maxCon;
+}
+
+void Client::update(int waitTimeout)
+{
+    if (!d->poller.hasChannels() && (waitTimeout > 0)) {
+        SGTimeStamp::sleepForMSec(waitTimeout);
+    } else {
+        d->poller.poll(waitTimeout);
+    }
+    
+    bool waitingRequests = !d->pendingRequests.empty();
+    ConnectionDict::iterator it = d->connections.begin();
+    for (; it != d->connections.end(); ) {
+        Connection* con = it->second;
+        if (con->hasIdleTimeout() || 
+            con->hasError() ||
+            con->hasErrorTimeout() ||
+            (!con->isActive() && waitingRequests))
+        {
+            if (con->hasErrorTimeout()) {
+                // tell the connection we're timing it out
+                con->handleTimeout();
+            }
+            
         // connection has been idle for a while, clean it up
-        // (or has an error condition, again clean it up)
-            SG_LOG(SG_IO, SG_INFO, "cleaning up " << it->second);
+        // (or if we have requests waiting for a different host,
+        // or an error condition
             ConnectionDict::iterator del = it++;
             delete del->second;
-            _connections.erase(del);
+            d->connections.erase(del);
         } else {
             if (it->second->shouldStartNext()) {
-                it->second->startNext();
+                it->second->tryStartNextRequest();
             }
-            
             ++it;
         }
-    } // of connecion iteration
+    } // of connection iteration
+    
+    if (waitingRequests && (d->connections.size() < d->maxConnections)) {
+        RequestList waiting(d->pendingRequests);
+        d->pendingRequests.clear();
+        
+        // re-submit all waiting requests in order; this takes care of
+        // finding multiple pending items targetted to the same (new)
+        // connection
+        BOOST_FOREACH(Request_ptr req, waiting) {
+            makeRequest(req);
+        }
+    }
 }
 
 void Client::makeRequest(const Request_ptr& r)
 {
-    string host = r->host();
+    if( r->isComplete() )
+      return;
+
+    if( r->url().find("://") == std::string::npos ) {
+        r->setFailure(EINVAL, "malformed URL");
+        return;
+    }
+
+    if( r->url().find("http://") != 0 ) {
+        r->setFailure(EINVAL, "only HTTP protocol is supported");
+        return;
+    }
+    
+    std::string host = r->host();
     int port = r->port();
-    if (!_proxy.empty()) {
-        host = _proxy;
-        port = _proxyPort;
+    if (!d->proxy.empty()) {
+        host = d->proxy;
+        port = d->proxyPort;
     }
     
-    stringstream ss;
+    Connection* con = NULL;
+    std::stringstream ss;
     ss << host << "-" << port;
-    string connectionId = ss.str();
+    std::string connectionId = ss.str();
+    bool havePending = !d->pendingRequests.empty();
+    bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
+    ConnectionDict::iterator consEnd = d->connections.end();
+     
+    // assign request to an existing Connection.
+    // various options exist here, examined in order
+    ConnectionDict::iterator it = d->connections.find(connectionId);
+    if (atConnectionsLimit && (it == consEnd)) {
+        // maximum number of connections active, queue this request
+        // when a connection goes inactive, we'll start this one            
+        d->pendingRequests.push_back(r);
+        return;
+    }
+    
+    // scan for an idle Connection to the same host (likely if we're
+    // retrieving multiple resources from the same host in quick succession)
+    // if we have pending requests (waiting for a free Connection), then
+    // force new requests on this id to always use the first Connection
+    // (instead of the random selection below). This ensures that when
+    // there's pressure on the number of connections to keep alive, one
+    // host can't DoS every other.
+    int count = 0;
+    for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
+        if (havePending || !it->second->isActive()) {
+            con = it->second;
+            break;
+        }
+    }
     
-    if (_connections.find(connectionId) == _connections.end()) {
-        Connection* con = new Connection(this);
+    if (!con && atConnectionsLimit) {
+        // all current connections are busy (active), and we don't
+        // have free connections to allocate, so let's assign to
+        // an existing one randomly. Ideally we'd used whichever one will
+        // complete first but we don't have that info.
+        int index = rand() % count;
+        for (it = d->connections.find(connectionId); index > 0; --index) { ; }
+        con = it->second;
+    }
+    
+    // allocate a new connection object
+    if (!con) {
+        con = new Connection(this);
         con->setServer(host, port);
-        _connections[connectionId] = con;
+        d->poller.addChannel(con);
+        d->connections.insert(d->connections.end(), 
+            ConnectionDict::value_type(connectionId, con));
     }
     
-    _connections[connectionId]->queueRequest(r);
+    con->queueRequest(r);
+}
+
+//------------------------------------------------------------------------------
+FileRequestRef Client::save( const std::string& url,
+                             const std::string& filename )
+{
+  FileRequestRef req = new FileRequest(url, filename);
+  makeRequest(req);
+  return req;
+}
+
+//------------------------------------------------------------------------------
+MemoryRequestRef Client::load(const std::string& url)
+{
+  MemoryRequestRef req = new MemoryRequest(url);
+  makeRequest(req);
+  return req;
 }
 
 void Client::requestFinished(Connection* con)
@@ -431,16 +801,80 @@ void Client::requestFinished(Connection* con)
     
 }
 
-void Client::setUserAgent(const string& ua)
+void Client::setUserAgent(const std::string& ua)
+{
+    d->userAgent = ua;
+}
+
+const std::string& Client::userAgent() const
 {
-    _userAgent = ua;
+    return d->userAgent;
+}
+    
+const std::string& Client::proxyHost() const
+{
+    return d->proxy;
+}
+    
+const std::string& Client::proxyAuth() const
+{
+    return d->proxyAuth;
+}
+
+void Client::setProxy( const std::string& proxy,
+                       int port,
+                       const std::string& auth )
+{
+    d->proxy = proxy;
+    d->proxyPort = port;
+    d->proxyAuth = auth;
+}
+
+bool Client::hasActiveRequests() const
+{
+    ConnectionDict::const_iterator it = d->connections.begin();
+    for (; it != d->connections.end(); ++it) {
+        if (it->second->isActive()) return true;
+    }
+    
+    return false;
+}
+
+void Client::receivedBytes(unsigned int count)
+{
+    d->bytesTransferred += count;
+    d->totalBytesDownloaded += count;
+}
+    
+unsigned int Client::transferRateBytesPerSec() const
+{
+    unsigned int e = d->timeTransferSample.elapsedMSec();
+    if (e > 400) {
+        // too long a window, ignore
+        d->timeTransferSample.stamp();
+        d->bytesTransferred = 0;
+        d->lastTransferRate = 0;
+        return 0;
+    }
+    
+    if (e < 100) { // avoid really narrow windows
+        return d->lastTransferRate;
+    }
+    
+    unsigned int ratio = (d->bytesTransferred * 1000) / e;
+    // run a low-pass filter
+    unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
+    smoothed /= 400;
+        
+    d->timeTransferSample.stamp();
+    d->bytesTransferred = 0;
+    d->lastTransferRate = smoothed;
+    return smoothed;
 }
 
-void Client::setProxy(const string& proxy, int port, const string& auth)
+uint64_t Client::totalBytesDownloaded() const
 {
-    _proxy = proxy;
-    _proxyPort = port;
-    _proxyAuth = auth;
+    return d->totalBytesDownloaded;
 }
 
 } // of namespace HTTP