]> git.mxchange.org Git - simgear.git/blobdiff - simgear/io/HTTPClient.cxx
Lots of (mostly) doxygen fixes/cleanup.
[simgear.git] / simgear / io / HTTPClient.cxx
index 0630af32cc5d2b2242aaac8437bf23a00a9bfd78..4d1f2c0e757292a8cb37e7ed81a8130882a9d65f 100644 (file)
 //
 
 #include "HTTPClient.hxx"
+#include "HTTPFileRequest.hxx"
 
 #include <sstream>
 #include <cassert>
 #include <cstdlib> // rand()
 #include <list>
-#include <iostream>
 #include <errno.h>
 #include <map>
+#include <stdexcept>
 
 #include <boost/foreach.hpp>
 #include <boost/algorithm/string/case_conv.hpp>
 
-#include <zlib.h>
-
 #include <simgear/io/sg_netChat.hxx>
-#include <simgear/io/lowlevel.hxx>
+#include <simgear/io/HTTPContentDecode.hxx>
 #include <simgear/misc/strutils.hxx>
 #include <simgear/compiler.h>
 #include <simgear/debug/logstream.hxx>
 #  endif
 #endif
 
-using std::string;
-using std::stringstream;
-using std::vector;
-
 namespace simgear
 {
 
@@ -65,19 +60,6 @@ namespace HTTP
 extern const int DEFAULT_HTTP_PORT = 80;
 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
-const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
-const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
-  
-// see http://www.ietf.org/rfc/rfc1952.txt for these values and
-// detailed description of the logic 
-const int GZIP_HEADER_ID1 = 31;
-const int GZIP_HEADER_ID2 = 139;
-const int GZIP_HEADER_METHOD_DEFLATE = 8;
-const unsigned int GZIP_HEADER_SIZE = 10;
-const int GZIP_HEADER_FEXTRA = 1 << 2;
-const int GZIP_HEADER_FNAME = 1 << 3;
-const int GZIP_HEADER_COMMENT = 1 << 4;
-const int GZIP_HEADER_CRC = 1 << 1;
 
 class Connection;
 typedef std::multimap<std::string, Connection*> ConnectionDict;
@@ -97,6 +79,11 @@ public:
     
 // connections by host (potentially more than one)
     ConnectionDict connections;
+    
+    SGTimeStamp timeTransferSample;
+    unsigned int bytesTransferred;
+    unsigned int lastTransferRate;
+    uint64_t totalBytesDownloaded;
 };
   
 class Connection : public NetChat
@@ -105,26 +92,28 @@ public:
     Connection(Client* pr) :
         client(pr),
         state(STATE_CLOSED),
-        port(DEFAULT_HTTP_PORT),
-        zlibInflateBuffer(NULL),
-        zlibInflateBufferSize(0),
-        zlibOutputBuffer(NULL)
+        port(DEFAULT_HTTP_PORT)
     {
-        
     }
     
     virtual ~Connection()
     {
-      if (zlibInflateBuffer) {
-        free(zlibInflateBuffer);
-      }
-      
-      if (zlibOutputBuffer) {
-        free(zlibOutputBuffer);
-      }
+    }
+
+    virtual void handleBufferRead (NetBuffer& buffer)
+    {
+      if( !activeRequest || !activeRequest->isComplete() )
+        return NetChat::handleBufferRead(buffer);
+
+      // Request should be aborted (signaled by setting its state to complete).
+
+      // force the state to GETTING_BODY, to simplify logic in
+      // responseComplete and handleClose
+      state = STATE_GETTING_BODY;
+      responseComplete();
     }
   
-    void setServer(const string& h, short p)
+    void setServer(const std::string& h, short p)
     {
         host = h;
         port = p;
@@ -133,28 +122,30 @@ public:
     // socket-level errors
     virtual void handleError(int error)
     {
-        if (error == ENOENT) {
-        // name lookup failure
-            // we won't have an active request yet, so the logic below won't
-            // fire to actually call setFailure. Let's fail all of the requests
+        const char* errStr = strerror(error);
+        if (!activeRequest)
+        {
+        // connection level failure, eg name lookup or routing
+        // we won't have an active request yet, so let's fail all of the
+        // requests since we presume it's a systematic failure for
+        // the host in question
             BOOST_FOREACH(Request_ptr req, sentRequests) {
-                req->setFailure(error, "hostname lookup failure");
+                req->setFailure(error, errStr);
             }
             
             BOOST_FOREACH(Request_ptr req, queuedRequests) {
-                req->setFailure(error, "hostname lookup failure");
+                req->setFailure(error, errStr);
             }
             
-        // name lookup failure, abandon all requests on this connection
             sentRequests.clear();
             queuedRequests.clear();
         }
         
         NetChat::handleError(error);
         if (activeRequest) {            
-            SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
-            activeRequest->setFailure(error, "socket error");
+            activeRequest->setFailure(error, errStr);
             activeRequest = NULL;
+            _contentDecoder.reset();
         }
     
         state = STATE_SOCKET_ERROR;
@@ -182,6 +173,7 @@ public:
                     sentRequests.erase(it);
                 }
                 activeRequest = NULL;
+                _contentDecoder.reset();
             }
             
             state = STATE_CLOSED;
@@ -198,18 +190,37 @@ public:
       sentRequests.clear();
     }
     
+    void handleTimeout()
+    {
+        NetChat::handleError(ETIMEDOUT);
+        if (activeRequest) {
+            SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
+            activeRequest->setFailure(ETIMEDOUT, "socket timeout");
+            activeRequest = NULL;
+            _contentDecoder.reset();
+        }
+        
+        state = STATE_SOCKET_ERROR;
+    }
+    
     void queueRequest(const Request_ptr& r)
     {
-      queuedRequests.push_back(r);
-      tryStartNextRequest();
+        queuedRequests.push_back(r);
+        tryStartNextRequest();
     }
     
     void beginResponse()
     {
-      assert(!sentRequests.empty());
-      
-      activeRequest = sentRequests.front();      
-      activeRequest->responseStart(buffer);
+        assert(!sentRequests.empty());
+        assert(state == STATE_WAITING_FOR_RESPONSE);
+        
+        activeRequest = sentRequests.front();
+        try {
+            activeRequest->responseStart(buffer);
+        } catch (sg_exception& e) {
+            handleError(EIO);
+        }
+        
       state = STATE_GETTING_HEADERS;
       buffer.clear();
       if (activeRequest->responseCode() == 204) {
@@ -222,11 +233,15 @@ public:
 
       bodyTransferSize = -1;
       chunkedTransfer = false;
-      contentGZip = contentDeflate = false;
+      _contentDecoder.reset();
     }
   
     void tryStartNextRequest()
     {
+      while( !queuedRequests.empty()
+          && queuedRequests.front()->isComplete() )
+        queuedRequests.pop_front();
+
       if (queuedRequests.empty()) {
         idleTime.stamp();
         return;
@@ -238,6 +253,7 @@ public:
       
       if (state == STATE_CLOSED) {
           if (!connectToHost()) {
+            
               return;
           }
           
@@ -247,28 +263,28 @@ public:
      
       Request_ptr r = queuedRequests.front();
       r->requestStart();
-      requestBodyBytesToSend = r->requestBodyLength();
-          
-      stringstream headerData;
-      string path = r->path();
+
+      std::stringstream headerData;
+      std::string path = r->path();
       assert(!path.empty());
-      string query = r->query();
-      string bodyData;
+      std::string query = r->query();
+      std::string bodyData;
       
       if (!client->proxyHost().empty()) {
           path = r->scheme() + "://" + r->host() + r->path();
       }
 
-      if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
+      if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
           bodyData = query.substr(1); // URL-encode, drop the leading '?'
           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
           headerData << "Content-Length:" << bodyData.size() << "\r\n";
       } else {
           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
-          if (requestBodyBytesToSend >= 0) {
-            headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
-            headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
+          if( r->hasBodyData() )
+          {
+            headerData << "Content-Length:" << r->bodyLength() << "\r\n";
+            headerData << "Content-Type:" << r->bodyType() << "\r\n";
           }
       }
       
@@ -279,8 +295,8 @@ public:
           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
       }
 
-      BOOST_FOREACH(string h, r->requestHeaders()) {
-          headerData << h << ": " << r->header(h) << "\r\n";
+      BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
+          headerData << h.first << ": " << h.second << "\r\n";
       }
 
       headerData << "\r\n"; // final CRLF to terminate the headers
@@ -295,170 +311,62 @@ public:
           // drain down before trying to start any more requests.
           return;
       }
-      
-      while (requestBodyBytesToSend > 0) {
-        char buf[4096];
-        int len = r->getBodyData(buf, 4096);
-        if (len > 0) {
-          requestBodyBytesToSend -= len;
-          if (!bufferSend(buf, len)) {
-            SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
-            state = STATE_SOCKET_ERROR;
-            return;
+
+      if( r->hasBodyData() )
+        for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
+        {
+          char buf[4096];
+          size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
+          if( len )
+          {
+            if( !bufferSend(buf, len) )
+            {
+              SG_LOG(SG_IO,
+                     SG_WARN,
+                     "overflow the HTTP::Connection output buffer");
+              state = STATE_SOCKET_ERROR;
+              return;
+            }
+            body_bytes_sent += len;
+          }
+          else
+          {
+            SG_LOG(SG_IO,
+                   SG_WARN,
+                   "HTTP asynchronous request body generation is unsupported");
+            break;
           }
-      //    SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
-        } else {
-          SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
-          break;
         }
-      }
       
-      SG_LOG(SG_IO, SG_DEBUG, "did start request:" << r->url() <<
-          "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
-          "\n\t on connection " << this);
-    // successfully sent, remove from queue, and maybe send the next
+      //   SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
+      //       "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
+      //      "\n\t on connection " << this);
+      // successfully sent, remove from queue, and maybe send the next
       queuedRequests.pop_front();
       sentRequests.push_back(r);
-      
-    // pipelining, let's maybe send the next request right away
+      state = STATE_WAITING_FOR_RESPONSE;
+        
+      // pipelining, let's maybe send the next request right away
       tryStartNextRequest();
     }
     
     virtual void collectIncomingData(const char* s, int n)
     {
         idleTime.stamp();
-        if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
-          if (contentGZip || contentDeflate) {
-            expandCompressedData(s, n);
-          } else {
-            activeRequest->processBodyBytes(s, n);
-          }
-        } else {
-            buffer += string(s, n);
-        }
-    }
+        client->receivedBytes(static_cast<unsigned int>(n));
 
-    
-    void expandCompressedData(const char* s, int n)
-    {
-      int reqSize = n + zlib.avail_in;
-      if (reqSize > zlibInflateBufferSize) {
-      // reallocate
-        unsigned char* newBuf = (unsigned char*) malloc(reqSize);
-        memcpy(newBuf, zlib.next_in, zlib.avail_in);
-        memcpy(newBuf + zlib.avail_in, s, n);
-        free(zlibInflateBuffer);
-        zlibInflateBuffer = newBuf;
-        zlibInflateBufferSize = reqSize;
-      } else {
-      // important to use memmove here, since it's very likely
-      // the source and destination ranges overlap
-        memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
-        memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
-      }
-            
-      zlib.next_in = (unsigned char*) zlibInflateBuffer;
-      zlib.avail_in = reqSize;
-      zlib.next_out = zlibOutputBuffer;
-      zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
-      
-      if (contentGZip && !handleGZipHeader()) {
-          return;
-      }
-      
-      int writtenSize = 0;
-      do {
-        int result = inflate(&zlib, Z_NO_FLUSH);
-        if (result == Z_OK || result == Z_STREAM_END) {
-            // nothing to do
-        } else {
-          SG_LOG(SG_IO, SG_WARN, "HTTP: got Zlib error:" << result);
-          return;
-        }
-            
-        writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
-        if (result == Z_STREAM_END) {
-            break;
-        }
-      } while ((writtenSize == 0) && (zlib.avail_in > 0));
-    
-      if (writtenSize > 0) {
-        activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
-      }
+        if(   (state == STATE_GETTING_BODY)
+           || (state == STATE_GETTING_CHUNKED_BYTES) )
+          _contentDecoder.receivedBytes(s, n);
+        else
+          buffer.append(s, n);
     }
-    
-    bool handleGZipHeader()
-    {
-        // we clear this down to contentDeflate once the GZip header has been seen
-        if (zlib.avail_in < GZIP_HEADER_SIZE) {
-          return false; // need more header bytes
-        }
-        
-        if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
-            (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
-            (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
-        {
-          return false; // invalid GZip header
-        }
-        
-        char flags = zlibInflateBuffer[3];
-        unsigned int gzipHeaderSize =  GZIP_HEADER_SIZE;
-        if (flags & GZIP_HEADER_FEXTRA) {
-          gzipHeaderSize += 2;
-          if (zlib.avail_in < gzipHeaderSize) {
-            return false; // need more header bytes
-          }
-          
-          unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
-          if ( sgIsBigEndian() ) {
-              sgEndianSwap( &extraHeaderBytes );
-          }
-          
-          gzipHeaderSize += extraHeaderBytes;
-          if (zlib.avail_in < gzipHeaderSize) {
-            return false; // need more header bytes
-          }
-        }
-        
-        if (flags & GZIP_HEADER_FNAME) {
-          gzipHeaderSize++;
-          while (gzipHeaderSize <= zlib.avail_in) {
-            if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
-              break; // found terminating NULL character
-            }
-          }
-        }
-        
-        if (flags & GZIP_HEADER_COMMENT) {
-          gzipHeaderSize++;
-          while (gzipHeaderSize <= zlib.avail_in) {
-            if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
-              break; // found terminating NULL character
-            }
-          }
-        }
-        
-        if (flags & GZIP_HEADER_CRC) {
-          gzipHeaderSize += 2;
-        }
-        
-        if (zlib.avail_in < gzipHeaderSize) {
-          return false; // need more header bytes
-        }
-        
-        zlib.next_in += gzipHeaderSize;
-        zlib.avail_in -= gzipHeaderSize;
-      // now we've processed the GZip header, can decode as deflate
-        contentGZip = false;
-        contentDeflate = true;
-        return true;
-    }
-    
+
     virtual void foundTerminator(void)
     {
         idleTime.stamp();
         switch (state) {
-        case STATE_IDLE:
+        case STATE_WAITING_FOR_RESPONSE:
             beginResponse();
             break;
             
@@ -487,6 +395,9 @@ public:
             buffer.clear();
             break;
         
+        case STATE_IDLE:
+            SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
+                
         default:
             break;
         }
@@ -498,6 +409,7 @@ public:
             return false;
         }
         
+        assert(sentRequests.empty());
         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
     }
   
@@ -544,37 +456,9 @@ private:
     
     void processHeader()
     {
-        string h = strutils::simplify(buffer);
+        std::string h = strutils::simplify(buffer);
         if (h.empty()) { // blank line terminates headers
             headersComplete();
-            
-            if (contentGZip || contentDeflate) {
-                memset(&zlib, 0, sizeof(z_stream));
-                if (!zlibOutputBuffer) {
-                    zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
-                }
-            
-                // NULLs means we'll get default alloc+free methods
-                // which is absolutely fine
-                zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
-                zlib.next_out = zlibOutputBuffer;
-                if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
-                  SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
-                }
-            }
-          
-            if (chunkedTransfer) {
-                state = STATE_GETTING_CHUNKED;
-            } else if (noMessageBody || (bodyTransferSize == 0)) {
-                // force the state to GETTING_BODY, to simplify logic in
-                // responseComplete and handleClose
-                state = STATE_GETTING_BODY;
-                responseComplete();
-            } else {
-                setByteCount(bodyTransferSize); // may be -1, that's fine              
-                state = STATE_GETTING_BODY;
-            }
-            
             return;
         }
               
@@ -584,9 +468,9 @@ private:
             return;
         }
         
-        string key = strutils::simplify(buffer.substr(0, colonPos));
-        string lkey = boost::to_lower_copy(key);
-        string value = strutils::strip(buffer.substr(colonPos + 1));
+        std::string key = strutils::simplify(buffer.substr(0, colonPos));
+        std::string lkey = boost::to_lower_copy(key);
+        std::string value = strutils::strip(buffer.substr(colonPos + 1));
         
         // only consider these if getting headers (as opposed to trailers 
         // of a chunked transfer)
@@ -603,20 +487,14 @@ private:
             } else if (lkey == "transfer-encoding") {
                 processTransferEncoding(value);
             } else if (lkey == "content-encoding") {
-              if (value == "gzip") {
-                contentGZip = true;
-              } else if (value == "deflate") {
-                contentDeflate = true;
-              } else if (value != "identity") {
-                SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
-              }
+                _contentDecoder.setEncoding(value);
             }
         }
     
         activeRequest->responseHeader(lkey, value);
     }
     
-    void processTransferEncoding(const string& te)
+    void processTransferEncoding(const std::string& te)
     {
         if (te == "chunked") {
             chunkedTransfer = true;
@@ -667,17 +545,25 @@ private:
     void headersComplete()
     {
         activeRequest->responseHeadersComplete();
+        _contentDecoder.initWithRequest(activeRequest);
+      
+        if (chunkedTransfer) {
+            state = STATE_GETTING_CHUNKED;
+        } else if (noMessageBody || (bodyTransferSize == 0)) {
+            // force the state to GETTING_BODY, to simplify logic in
+            // responseComplete and handleClose
+            state = STATE_GETTING_BODY;
+            responseComplete();
+        } else {
+            setByteCount(bodyTransferSize); // may be -1, that's fine
+            state = STATE_GETTING_BODY;
+        }
     }
     
     void responseComplete()
     {
-     //   SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
-        activeRequest->responseComplete();
-        client->requestFinished(this);
-      
-        if (contentDeflate) {
-          inflateEnd(&zlib);
-        }
+        Request_ptr completedRequest = activeRequest;
+        _contentDecoder.finish();
       
         assert(sentRequests.front() == activeRequest);
         sentRequests.pop_front();
@@ -688,21 +574,29 @@ private:
             if (doClose) {
           // this will bring us into handleClose() above, which updates
           // state to STATE_CLOSED
-            close();
+              close();
               
           // if we have additional requests waiting, try to start them now
-            tryStartNextRequest();
-          }
+              tryStartNextRequest();
+            }
         }
         
-      if (state != STATE_CLOSED)  {
-        state = STATE_IDLE;
-      }
-      setTerminator("\r\n");
+        if (state != STATE_CLOSED)  {
+            state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
+        }
+        
+    // notify request after we change state, so this connection is idle
+    // if completion triggers other requests (which is likely)
+        //   SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
+        completedRequest->responseComplete();
+        client->requestFinished(this);
+        
+        setTerminator("\r\n");
     }
     
     enum ConnectionState {
         STATE_IDLE = 0,
+        STATE_WAITING_FOR_RESPONSE,
         STATE_GETTING_HEADERS,
         STATE_GETTING_BODY,
         STATE_GETTING_CHUNKED,
@@ -715,23 +609,18 @@ private:
     Client* client;
     Request_ptr activeRequest;
     ConnectionState state;
-    string host;
+    std::string host;
     short port;
     std::string buffer;
     int bodyTransferSize;
     SGTimeStamp idleTime;
     bool chunkedTransfer;
     bool noMessageBody;
-    int requestBodyBytesToSend;
-  
-    z_stream zlib;
-    unsigned char* zlibInflateBuffer;
-    int zlibInflateBufferSize;
-    unsigned char* zlibOutputBuffer;
-    bool contentGZip, contentDeflate;
-  
+    
     RequestList queuedRequests;
     RequestList sentRequests;
+    
+    ContentDecoder _contentDecoder;
 };
 
 Client::Client() :
@@ -739,6 +628,11 @@ Client::Client() :
 {
     d->proxyPort = 0;
     d->maxConnections = 4;
+    d->bytesTransferred = 0;
+    d->lastTransferRate = 0;
+    d->timeTransferSample.stamp();
+    d->totalBytesDownloaded = 0;
+    
     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
 }
 
@@ -757,9 +651,13 @@ void Client::setMaxConnections(unsigned int maxCon)
 
 void Client::update(int waitTimeout)
 {
-    d->poller.poll(waitTimeout);
-    bool waitingRequests = !d->pendingRequests.empty();
+    if (!d->poller.hasChannels() && (waitTimeout > 0)) {
+        SGTimeStamp::sleepForMSec(waitTimeout);
+    } else {
+        d->poller.poll(waitTimeout);
+    }
     
+    bool waitingRequests = !d->pendingRequests.empty();
     ConnectionDict::iterator it = d->connections.begin();
     for (; it != d->connections.end(); ) {
         Connection* con = it->second;
@@ -768,6 +666,11 @@ void Client::update(int waitTimeout)
             con->hasErrorTimeout() ||
             (!con->isActive() && waitingRequests))
         {
+            if (con->hasErrorTimeout()) {
+                // tell the connection we're timing it out
+                con->handleTimeout();
+            }
+            
         // connection has been idle for a while, clean it up
         // (or if we have requests waiting for a different host,
         // or an error condition
@@ -797,7 +700,20 @@ void Client::update(int waitTimeout)
 
 void Client::makeRequest(const Request_ptr& r)
 {
-    string host = r->host();
+    if( r->isComplete() )
+      return;
+
+    if( r->url().find("://") == std::string::npos ) {
+        r->setFailure(EINVAL, "malformed URL");
+        return;
+    }
+
+    if( r->url().find("http://") != 0 ) {
+        r->setFailure(EINVAL, "only HTTP protocol is supported");
+        return;
+    }
+    
+    std::string host = r->host();
     int port = r->port();
     if (!d->proxy.empty()) {
         host = d->proxy;
@@ -805,47 +721,47 @@ void Client::makeRequest(const Request_ptr& r)
     }
     
     Connection* con = NULL;
-    stringstream ss;
+    std::stringstream ss;
     ss << host << "-" << port;
-    string connectionId = ss.str();
+    std::string connectionId = ss.str();
     bool havePending = !d->pendingRequests.empty();
+    bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
     ConnectionDict::iterator consEnd = d->connections.end();
      
     // assign request to an existing Connection.
     // various options exist here, examined in order
-    if (d->connections.size() >= d->maxConnections) {
-        ConnectionDict::iterator it = d->connections.find(connectionId);
-        if (it == consEnd) {
-            // maximum number of connections active, queue this request
-            // when a connection goes inactive, we'll start this one
-            d->pendingRequests.push_back(r);
-            return;
-        }
-        
-        // scan for an idle Connection to the same host (likely if we're
-        // retrieving multiple resources from the same host in quick succession)
-        // if we have pending requests (waiting for a free Connection), then
-        // force new requests on this id to always use the first Connection
-        // (instead of the random selection below). This ensures that when
-        // there's pressure on the number of connections to keep alive, one
-        // host can't DoS every other.
-        int count = 0;
-        for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
-            if (havePending || !it->second->isActive()) {
-                con = it->second;
-                break;
-            }
-        }
-        
-        if (!con) {
-            // we have at least one connection to the host, but they are
-            // all active - we need to pick one to queue the request on.
-            // we use random but round-robin would also work.
-            int index = rand() % count;
-            for (it = d->connections.find(connectionId); index > 0; --index) { ; }
+    ConnectionDict::iterator it = d->connections.find(connectionId);
+    if (atConnectionsLimit && (it == consEnd)) {
+        // maximum number of connections active, queue this request
+        // when a connection goes inactive, we'll start this one            
+        d->pendingRequests.push_back(r);
+        return;
+    }
+    
+    // scan for an idle Connection to the same host (likely if we're
+    // retrieving multiple resources from the same host in quick succession)
+    // if we have pending requests (waiting for a free Connection), then
+    // force new requests on this id to always use the first Connection
+    // (instead of the random selection below). This ensures that when
+    // there's pressure on the number of connections to keep alive, one
+    // host can't DoS every other.
+    int count = 0;
+    for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
+        if (havePending || !it->second->isActive()) {
             con = it->second;
+            break;
         }
-    } // of at max connections limit
+    }
+    
+    if (!con && atConnectionsLimit) {
+        // all current connections are busy (active), and we don't
+        // have free connections to allocate, so let's assign to
+        // an existing one randomly. Ideally we'd used whichever one will
+        // complete first but we don't have that info.
+        int index = rand() % count;
+        for (it = d->connections.find(connectionId); index > 0; --index) { ; }
+        con = it->second;
+    }
     
     // allocate a new connection object
     if (!con) {
@@ -859,12 +775,29 @@ void Client::makeRequest(const Request_ptr& r)
     con->queueRequest(r);
 }
 
+//------------------------------------------------------------------------------
+FileRequestRef Client::save( const std::string& url,
+                             const std::string& filename )
+{
+  FileRequestRef req = new FileRequest(url, filename);
+  makeRequest(req);
+  return req;
+}
+
+//------------------------------------------------------------------------------
+MemoryRequestRef Client::load(const std::string& url)
+{
+  MemoryRequestRef req = new MemoryRequest(url);
+  makeRequest(req);
+  return req;
+}
+
 void Client::requestFinished(Connection* con)
 {
     
 }
 
-void Client::setUserAgent(const string& ua)
+void Client::setUserAgent(const std::string& ua)
 {
     d->userAgent = ua;
 }
@@ -884,7 +817,9 @@ const std::string& Client::proxyAuth() const
     return d->proxyAuth;
 }
 
-void Client::setProxy(const string& proxy, int port, const string& auth)
+void Client::setProxy( const std::string& proxy,
+                       int port,
+                       const std::string& auth )
 {
     d->proxy = proxy;
     d->proxyPort = port;
@@ -901,6 +836,43 @@ bool Client::hasActiveRequests() const
     return false;
 }
 
+void Client::receivedBytes(unsigned int count)
+{
+    d->bytesTransferred += count;
+    d->totalBytesDownloaded += count;
+}
+    
+unsigned int Client::transferRateBytesPerSec() const
+{
+    unsigned int e = d->timeTransferSample.elapsedMSec();
+    if (e > 400) {
+        // too long a window, ignore
+        d->timeTransferSample.stamp();
+        d->bytesTransferred = 0;
+        d->lastTransferRate = 0;
+        return 0;
+    }
+    
+    if (e < 100) { // avoid really narrow windows
+        return d->lastTransferRate;
+    }
+    
+    unsigned int ratio = (d->bytesTransferred * 1000) / e;
+    // run a low-pass filter
+    unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
+    smoothed /= 400;
+        
+    d->timeTransferSample.stamp();
+    d->bytesTransferred = 0;
+    d->lastTransferRate = smoothed;
+    return smoothed;
+}
+
+uint64_t Client::totalBytesDownloaded() const
+{
+    return d->totalBytesDownloaded;
+}
+
 } // of namespace HTTP
 
 } // of namespace simgear