]> git.mxchange.org Git - simgear.git/commitdiff
HTTP enhancements.
authorJames Turner <zakalawe@mac.com>
Wed, 15 Aug 2012 10:42:12 +0000 (11:42 +0100)
committerJames Turner <zakalawe@mac.com>
Sat, 27 Oct 2012 17:03:51 +0000 (18:03 +0100)
Support content-encoding and improve pipelining support.

simgear/io/HTTPClient.cxx
simgear/io/HTTPClient.hxx
simgear/io/HTTPRequest.cxx
simgear/io/HTTPRequest.hxx
simgear/io/test_HTTP.cxx

index cf19589e7db29553e1da589505cc4c1cdc45e21e..99fb67b3aaf6c3b1446061107634950ce9f7e235 100644 (file)
@@ -3,11 +3,15 @@
 #include <sstream>
 #include <cassert>
 #include <list>
+#include <iostream>
 
 #include <boost/foreach.hpp>
 #include <boost/algorithm/string/case_conv.hpp>
 
+#include <zlib.h>
+
 #include <simgear/io/sg_netChat.hxx>
+#include <simgear/io/lowlevel.hxx>
 #include <simgear/misc/strutils.hxx>
 #include <simgear/compiler.h>
 #include <simgear/debug/logstream.hxx>
@@ -33,18 +37,46 @@ namespace HTTP
 
 extern const int DEFAULT_HTTP_PORT = 80;
 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
-
+const int MAX_INFLIGHT_REQUESTS = 32;
+const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
+const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
+  
+// see http://www.ietf.org/rfc/rfc1952.txt for these values and
+// detailed description of the logic 
+const int GZIP_HEADER_ID1 = 31;
+const int GZIP_HEADER_ID2 = 139;
+const int GZIP_HEADER_METHOD_DEFLATE = 8;
+const int GZIP_HEADER_SIZE = 10;
+const int GZIP_HEADER_FEXTRA = 1 << 2;
+const int GZIP_HEADER_FNAME = 1 << 3;
+const int GZIP_HEADER_COMMENT = 1 << 4;
+const int GZIP_HEADER_CRC = 1 << 1;
+  
 class Connection : public NetChat
 {
 public:
     Connection(Client* pr) :
         client(pr),
         state(STATE_CLOSED),
-        port(DEFAULT_HTTP_PORT)
+        port(DEFAULT_HTTP_PORT),
+        zlibInflateBuffer(NULL),
+        zlibInflateBufferSize(0),
+        zlibOutputBuffer(NULL)
     {
         
     }
     
+    virtual ~Connection()
+    {
+      if (zlibInflateBuffer) {
+        free(zlibInflateBuffer);
+      }
+      
+      if (zlibOutputBuffer) {
+        free(zlibOutputBuffer);
+      }
+    }
+  
     void setServer(const string& h, short p)
     {
         host = h;
@@ -65,7 +97,7 @@ public:
     }
     
     virtual void handleClose()
-    {
+    {      
         NetChat::handleClose();
         
         if ((state == STATE_GETTING_BODY) && activeRequest) {
@@ -76,95 +108,266 @@ public:
         } else {
             state = STATE_CLOSED;
         }
+      
+      if (sentRequests.empty()) {
+        return;
+      }
+      
+    // restore sent requests to the queue, so they will be re-sent
+    // when the connection opens again
+      queuedRequests.insert(queuedRequests.begin(),
+                              sentRequests.begin(), sentRequests.end());
+      sentRequests.clear();
     }
     
     void queueRequest(const Request_ptr& r)
     {
-        if (!activeRequest) {
-            startRequest(r);
+      queuedRequests.push_back(r);
+      tryStartNextRequest();
+    }
+    
+    void beginResponse()
+    {
+      assert(!sentRequests.empty());
+      
+      activeRequest = sentRequests.front();
+      activeRequest->responseStart(buffer);
+      state = STATE_GETTING_HEADERS;
+      buffer.clear();
+      if (activeRequest->responseCode() == 204) {
+        noMessageBody = true;
+      } else if (activeRequest->method() == "HEAD") {
+        noMessageBody = true;
+      } else {
+        noMessageBody = false;
+      }
+
+      bodyTransferSize = -1;
+      chunkedTransfer = false;
+      contentGZip = contentDeflate = false;
+    }
+  
+    void tryStartNextRequest()
+    {
+      if (queuedRequests.empty()) {
+        idleTime.stamp();
+        return;
+      }
+      
+      if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
+        return;
+      }
+      
+      if (state == STATE_CLOSED) {
+          if (!connectToHost()) {
+              return;
+          }
+          
+          setTerminator("\r\n");
+          state = STATE_IDLE;
+      }
+     
+      Request_ptr r = queuedRequests.front();
+      requestBodyBytesToSend = r->requestBodyLength();
+    
+      stringstream headerData;
+      string path = r->path();
+      string query = r->query();
+      string bodyData;
+      
+      if (!client->proxyHost().empty()) {
+          path = r->scheme() + "://" + r->host() + r->path();
+      }
+
+      if (r->method() == "POST") {
+          headerData << r->method() << " " << path << " HTTP/1.1\r\n";
+          bodyData = query.substr(1); // URL-encode, drop the leading '?'
+          headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
+          headerData << "Content-Length:" << bodyData.size() << "\r\n";
+      } else {
+          headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
+          if (requestBodyBytesToSend >= 0) {
+            headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
+            headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
+          }
+      }
+      
+      headerData << "Host: " << r->hostAndPort() << "\r\n";
+      headerData << "User-Agent:" << client->userAgent() << "\r\n";
+      headerData << "Accept-Encoding: deflate, gzip\r\n";
+      if (!client->proxyAuth().empty()) {
+          headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
+      }
+
+      BOOST_FOREACH(string h, r->requestHeaders()) {
+          headerData << h << ": " << r->header(h) << "\r\n";
+      }
+
+      headerData << "\r\n"; // final CRLF to terminate the headers
+      if (!bodyData.empty()) {
+          headerData << bodyData;
+      }
+      
+      bool ok = push(headerData.str().c_str());
+      if (!ok) {
+          // we've over-stuffed the socket, give up for now, let things
+          // drain down before trying to start any more requests.
+          return;
+      }
+      
+      while (requestBodyBytesToSend > 0) {
+        char buf[4096];
+        int len = 4096;
+        r->getBodyData(buf, len);
+        if (len > 0) {
+          requestBodyBytesToSend -= len;
+          if (!bufferSend(buf, len)) {
+            SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
+            state = STATE_SOCKET_ERROR;
+            return;
+          }
         } else {
-            queuedRequests.push_back(r);
+          SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
+          break;
         }
+      }
+      
+      //std::cout << "did send request:" << r->url() << std::endl;
+    // successfully sent, remove from queue, and maybe send the next
+      queuedRequests.pop_front();
+      sentRequests.push_back(r);
+      
+    // pipelining, let's maybe send the next request right away
+      tryStartNextRequest();
     }
     
-    void startRequest(const Request_ptr& r)
+    virtual void collectIncomingData(const char* s, int n)
     {
-        if (state == STATE_CLOSED) {
-            if (!connectToHost()) {
-                return;
-            }
+        idleTime.stamp();
+        if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
+          if (contentGZip || contentDeflate) {
+            expandCompressedData(s, n);
+          } else {
+            activeRequest->processBodyBytes(s, n);
+          }
+        } else {
+            buffer += string(s, n);
+        }
+    }
+
+    
+    void expandCompressedData(const char* s, int n)
+    {
+      int reqSize = n + zlib.avail_in;
+      if (reqSize > zlibInflateBufferSize) {
+      // reallocate
+        unsigned char* newBuf = (unsigned char*) malloc(reqSize);
+        memcpy(newBuf, zlib.next_in, zlib.avail_in);
+        memcpy(newBuf + zlib.avail_in, s, n);
+        free(zlibInflateBuffer);
+        zlibInflateBuffer = newBuf;
+        zlibInflateBufferSize = reqSize;
+      } else {
+      // important to use memmove here, since it's very likely
+      // the source and destination ranges overlap
+        memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
+        memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
+      }
             
-            state = STATE_IDLE;
+      zlib.next_in = (unsigned char*) zlibInflateBuffer;
+      zlib.avail_in = reqSize;
+      zlib.next_out = zlibOutputBuffer;
+      zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
+      
+      if (contentGZip) {
+        // we clear this down to contentDeflate once the GZip header has been seen
+        if (reqSize < GZIP_HEADER_SIZE) {
+          return; // need more header bytes
         }
-                
-        activeRequest = r;
-        state = STATE_SENT_REQUEST;
-        bodyTransferSize = -1;
-        chunkedTransfer = false;
-        noMessageBody = (r->method() == "HEAD");
-        setTerminator("\r\n");
         
-        stringstream headerData;
-        string path = r->path();
-        string query = r->query();
-        string bodyData;
+        if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
+            (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
+            (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
+        {
+          return; // invalid GZip header
+        }
         
-        if (!client->proxyHost().empty()) {
-            path = r->scheme() + "://" + r->host() + r->path();
+        char flags = zlibInflateBuffer[3];
+        int gzipHeaderSize =  GZIP_HEADER_SIZE;
+        if (flags & GZIP_HEADER_FEXTRA) {
+          gzipHeaderSize += 2;
+          if (reqSize < gzipHeaderSize) {
+            return; // need more header bytes
+          }
+          
+          unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
+          if ( sgIsBigEndian() ) {
+              sgEndianSwap( &extraHeaderBytes );
+          }
+          
+          gzipHeaderSize += extraHeaderBytes;
+          if (reqSize < gzipHeaderSize) {
+            return; // need more header bytes
+          }
         }
-
-        if (r->method() == "POST") {
-            headerData << r->method() << " " << path << " HTTP/1.1\r\n";
-            bodyData = query.substr(1); // URL-encode, drop the leading '?'
-            headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
-            headerData << "Content-Length:" << bodyData.size() << "\r\n";
-        } else {
-            headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
+        
+        if (flags & GZIP_HEADER_FNAME) {
+          gzipHeaderSize++;
+          while (gzipHeaderSize <= reqSize) {
+            if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
+              break; // found terminating NULL character
+            }
+          }
         }
         
-        headerData << "Host: " << r->hostAndPort() << "\r\n";
-        headerData << "User-Agent:" << client->userAgent() << "\r\n";
-        if (!client->proxyAuth().empty()) {
-            headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
+        if (flags & GZIP_HEADER_COMMENT) {
+          gzipHeaderSize++;
+          while (gzipHeaderSize <= reqSize) {
+            if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
+              break; // found terminating NULL character
+            }
+          }
         }
-
-        BOOST_FOREACH(string h, r->requestHeaders()) {
-            headerData << h << ": " << r->header(h) << "\r\n";
+        
+        if (flags & GZIP_HEADER_CRC) {
+          gzipHeaderSize += 2;
         }
-
-        headerData << "\r\n"; // final CRLF to terminate the headers
-        if (!bodyData.empty()) {
-            headerData << bodyData;
+        
+        if (reqSize < gzipHeaderSize) {
+          return; // need more header bytes
         }
         
-        bool ok = push(headerData.str().c_str());
-        if (!ok) {
-            SG_LOG(SG_IO, SG_WARN, "HTTP writing to socket failed");
-            state = STATE_SOCKET_ERROR;
-            return;
-        }        
-    }
-    
-    virtual void collectIncomingData(const char* s, int n)
-    {
-        if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
-            activeRequest->processBodyBytes(s, n);
+        zlib.next_in += gzipHeaderSize;
+        zlib.avail_in = reqSize - gzipHeaderSize;
+      // now we've processed the GZip header, can decode as deflate
+        contentGZip = false;
+        contentDeflate = true;
+      }
+      
+      int writtenSize = 0;
+      do {
+        int result = inflate(&zlib, Z_NO_FLUSH);
+        if (result == Z_OK || result == Z_STREAM_END) {
+              
         } else {
-            buffer += string(s, n);
+          SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
+          return;
         }
+            
+        writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
+      } while ((writtenSize == 0) && (zlib.avail_in > 0));
+    
+      if (writtenSize > 0) {
+        activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
+      }
     }
     
     virtual void foundTerminator(void)
     {
+        idleTime.stamp();
         switch (state) {
-        case STATE_SENT_REQUEST:
-            activeRequest->responseStart(buffer);
-            state = STATE_GETTING_HEADERS;
-            buffer.clear();
-            if (activeRequest->responseCode() == 204) {
-                noMessageBody = true;
-            }
-            
+        case STATE_IDLE:
+            beginResponse();
             break;
             
         case STATE_GETTING_HEADERS:
@@ -204,6 +407,15 @@ public:
         
         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
     }
+  
+    bool hasErrorTimeout() const
+    {
+      if (state == STATE_IDLE) {
+        return false;
+      }
+      
+      return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
+    }
     
     bool hasError() const
     {
@@ -212,15 +424,7 @@ public:
     
     bool shouldStartNext() const
     {
-        return !activeRequest && !queuedRequests.empty() && 
-            ((state == STATE_CLOSED) || (state == STATE_IDLE));
-    }
-    
-    void startNext()
-    {
-        Request_ptr next = queuedRequests.front();
-        queuedRequests.pop_front();
-        startRequest(next);
+      return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
     }
 private:
     bool connectToHost()
@@ -246,6 +450,21 @@ private:
         if (h.empty()) { // blank line terminates headers
             headersComplete();
             
+            if (contentGZip || contentDeflate) {
+                bzero(&zlib, sizeof(z_stream));
+                if (!zlibOutputBuffer) {
+                    zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
+                }
+            
+                // NULLs means we'll get default alloc+free methods
+                // which is absolutely fine
+                zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
+                zlib.next_out = zlibOutputBuffer;
+                if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
+                  SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
+                }
+            }
+          
             if (chunkedTransfer) {
                 state = STATE_GETTING_CHUNKED;
             } else if (noMessageBody || (bodyTransferSize == 0)) {
@@ -254,13 +473,13 @@ private:
                 state = STATE_GETTING_BODY;
                 responseComplete();
             } else {
-                setByteCount(bodyTransferSize); // may be -1, that's fine
+                setByteCount(bodyTransferSize); // may be -1, that's fine              
                 state = STATE_GETTING_BODY;
             }
             
             return;
         }
-        
+              
         int colonPos = buffer.find(':');
         if (colonPos < 0) {
             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
@@ -275,6 +494,7 @@ private:
         // of a chunked transfer)
         if (state == STATE_GETTING_HEADERS) {
             if (lkey == "content-length") {
+
                 int sz = strutils::to_int(value);
                 if (bodyTransferSize <= 0) {
                     bodyTransferSize = sz;
@@ -284,6 +504,14 @@ private:
                 bodyTransferSize = strutils::to_int(value);
             } else if (lkey == "transfer-encoding") {
                 processTransferEncoding(value);
+            } else if (lkey == "content-encoding") {
+              if (value == "gzip") {
+                contentGZip = true;
+              } else if (value == "deflate") {
+                contentDeflate = true;
+              } else if (value != "identity") {
+                SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
+              }
             }
         }
     
@@ -345,36 +573,38 @@ private:
     
     void responseComplete()
     {
+      //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
         activeRequest->responseComplete();
         client->requestFinished(this);
-        
+      
+        if (contentDeflate) {
+          inflateEnd(&zlib);
+        }
+      
+        assert(sentRequests.front() == activeRequest);
+        sentRequests.pop_front();
         bool doClose = activeRequest->closeAfterComplete();
         activeRequest = NULL;
-        if (state == STATE_GETTING_BODY) {
-            state = STATE_IDLE;
+      
+        if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
             if (doClose) {
-            // this will bring us into handleClose() above, which updates
-            // state to STATE_CLOSED
-                close();
-            }
+          // this will bring us into handleClose() above, which updates
+          // state to STATE_CLOSED
+            close();
+              
+          // if we have additional requests waiting, try to start them now
+            tryStartNextRequest();
+          }
         }
         
-        setTerminator("\r\n");
-        
-    // if we have more requests, and we're idle, can start the next
-    // request immediately. Note we cannot do this if we're in STATE_CLOSED,
-    // since NetChannel::close cleans up state after calling handleClose;
-    // instead we pick up waiting requests in update()
-        if (!queuedRequests.empty() && (state == STATE_IDLE)) {
-            startNext();
-        } else {
-            idleTime.stamp();
-        }
+      if (state != STATE_CLOSED)  {
+        state = STATE_IDLE;
+      }
+      setTerminator("\r\n");
     }
     
     enum ConnectionState {
         STATE_IDLE = 0,
-        STATE_SENT_REQUEST,
         STATE_GETTING_HEADERS,
         STATE_GETTING_BODY,
         STATE_GETTING_CHUNKED,
@@ -394,8 +624,16 @@ private:
     SGTimeStamp idleTime;
     bool chunkedTransfer;
     bool noMessageBody;
-    
+    int requestBodyBytesToSend;
+  
+    z_stream zlib;
+    unsigned char* zlibInflateBuffer;
+    int zlibInflateBufferSize;
+    unsigned char* zlibOutputBuffer;
+    bool contentGZip, contentDeflate;
+  
     std::list<Request_ptr> queuedRequests;
+    std::list<Request_ptr> sentRequests;
 };
 
 Client::Client()
@@ -403,22 +641,24 @@ Client::Client()
     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
 }
 
-void Client::update()
+void Client::update(int waitTimeout)
 {
-    NetChannel::poll();
+    NetChannel::poll(waitTimeout);
         
     ConnectionDict::iterator it = _connections.begin();
     for (; it != _connections.end(); ) {
-        if (it->second->hasIdleTimeout() || it->second->hasError()) {
+        if (it->second->hasIdleTimeout() || it->second->hasError() ||
+            it->second->hasErrorTimeout())
+        {
         // connection has been idle for a while, clean it up
         // (or has an error condition, again clean it up)
-            SG_LOG(SG_IO, SG_INFO, "cleaning up " << it->second);
             ConnectionDict::iterator del = it++;
             delete del->second;
             _connections.erase(del);
         } else {
             if (it->second->shouldStartNext()) {
-                it->second->startNext();
+                SG_LOG(SG_IO, SG_INFO, "should start next, hmm");
+                it->second->tryStartNextRequest();
             }
             
             ++it;
index 7cc3771f1a16ae54919cc51345acc0a3baec18fa..f949c7bece1a33acb26adef224c5732c31cad38b 100644 (file)
@@ -18,7 +18,7 @@ class Client
 public:
     Client();
     
-    void update();
+    void update(int waitTimeout = 0);
     
     void makeRequest(const Request_ptr& r);
     
index 84855a4c7d290c879fca3160b18e4a8744437346..0e789e5ce3920c9ddbed5ac48afa82b75689981b 100644 (file)
@@ -218,6 +218,22 @@ bool Request::closeAfterComplete() const
 // for non HTTP/1.1 connections, assume server closes
     return _willClose || (_responseVersion != HTTP_1_1);
 }
+  
+int Request::requestBodyLength() const
+{
+  return -1;
+}
+
+std::string Request::requestBodyType() const
+{
+    return "text/plain";
+}
+  
+void Request::getBodyData(char*, int& count) const
+{
+  count = 0;
+  return;
+}
 
 } // of namespace HTTP
 
index 8d8a227054a12ef030523d8db8b0a2cd8e11269e..0c6e2685c6f1df956d95d1bf2388600b27c5ea8c 100644 (file)
@@ -43,7 +43,25 @@ public:
         
     void setResponseLength(unsigned int l);    
     virtual unsigned int responseLength() const;
+  
+    /**
+     * Query the size of the request body. -1 (the default value) means no
+     * request body
+     */
+    virtual int requestBodyLength() const;
     
+    /**
+     * Retrieve the body data bytes. Will be passed the maximum body bytes
+     * to return in the buffer, and should update count with the actual number
+     * of bytes written. 
+     */
+    virtual void getBodyData(char* s, int& count) const;
+  
+    /**
+     * retrieve the request body content type. Default is text/plain
+     */
+    virtual std::string requestBodyType() const;
+  
     /**
      * running total of body bytes received so far. Can be used
      * to generate a completion percentage, if the response length is
index dcafdb424dd35c8ec7bd64233375e75827582002..43d660c3a7536bebd2f39f3b53338c320976a01b 100644 (file)
@@ -22,6 +22,14 @@ using std::stringstream;
 using namespace simgear;
 
 const char* BODY1 = "The quick brown fox jumps over a lazy dog.";
+const char* BODY3 = "Cras ut neque nulla. Duis ut velit neque, sit amet "
+"pharetra risus. In est ligula, lacinia vitae congue in, sollicitudin at "
+"libero. Mauris pharetra pretium elit, nec placerat dui semper et. Maecenas "
+"magna magna, placerat sed luctus ac, commodo et ligula. Mauris at purus et "
+"nisl molestie auctor placerat at quam. Donec sapien magna, venenatis sed "
+"iaculis id, fringilla vel arcu. Duis sed neque nisi. Cras a arcu sit amet "
+"risus ultrices varius. Integer sagittis euismod dui id varius. Cras vel "
+"justo gravida metus.";
 
 const unsigned int body2Size = 8 * 1024;
 char body2[body2Size];
@@ -29,7 +37,7 @@ char body2[body2Size];
 #define COMPARE(a, b) \
     if ((a) != (b))  { \
         cerr << "failed:" << #a << " != " << #b << endl; \
-        cerr << "\tgot:" << a << endl; \
+        cerr << "\tgot:'" << a << "'" << endl; \
         exit(1); \
     }
 
@@ -91,6 +99,7 @@ protected:
     
     virtual void gotBodyData(const char* s, int n)
     {
+      //std::cout << "got body data:'" << string(s, n) << "'" <<std::endl;
         bodyData += string(s, n);
     }
     
@@ -107,6 +116,7 @@ public:
     {
         STATE_IDLE = 0,
         STATE_HEADERS,
+        STATE_CLOSING,
         STATE_REQUEST_BODY
     };
     
@@ -114,6 +124,7 @@ public:
     {
         state = STATE_IDLE;
         setTerminator("\r\n");
+        
     }
     
     virtual void collectIncomingData(const char* s, int n)
@@ -163,9 +174,10 @@ public:
             requestHeaders[key] = value;
             buffer.clear();
         } else if (state == STATE_REQUEST_BODY) {
-            cerr << "done getting requst body";
             receivedBody();
             setTerminator("\r\n");
+        } else if (state == STATE_CLOSING) {
+          // ignore!
         }
     }  
     
@@ -197,6 +209,14 @@ public:
             d << "\r\n"; // final CRLF to terminate the headers
             d << contentStr;
             push(d.str().c_str());
+        } else if (path == "/testLorem") {
+            string contentStr(BODY3);
+            stringstream d;
+            d << "HTTP/1.1 " << 200 << " " << reasonForCode(200) << "\r\n";
+            d << "Content-Length:" << contentStr.size() << "\r\n";
+            d << "\r\n"; // final CRLF to terminate the headers
+            d << contentStr;
+            push(d.str().c_str());
         } else if (path == "/test_zero_length_content") {
             string contentStr;
             stringstream d;
@@ -257,12 +277,15 @@ public:
             sendBody2();
         } else if (strutils::starts_with(path, "/test_1_0")) {
             string contentStr(BODY1);
+            if (strutils::ends_with(path, "/B")) {
+                contentStr = BODY3;
+            }
             stringstream d;
             d << "HTTP/1.0 " << 200 << " " << reasonForCode(200) << "\r\n";
             d << "\r\n"; // final CRLF to terminate the headers
             d << contentStr;
             push(d.str().c_str());
-            closeWhenDone();
+            closeAfterSending();
         } else if (path == "/test_close") {
             string contentStr(BODY1);
             stringstream d;
@@ -271,7 +294,7 @@ public:
             d << "\r\n"; // final CRLF to terminate the headers
             d << contentStr;
             push(d.str().c_str());
-            closeWhenDone();
+            closeAfterSending();
         } else if (path == "/test_args") {
             if ((args["foo"] != "abc") || (args["bar"] != "1234") || (args["username"] != "johndoe")) {
                 sendErrorResponse(400, true, "bad arguments");
@@ -300,6 +323,12 @@ public:
         }
     }
     
+    void closeAfterSending()
+    {
+      state = STATE_CLOSING;
+      closeWhenDone();
+    }
+  
     void receivedBody()
     {
         state = STATE_IDLE;
@@ -456,6 +485,19 @@ int main(int argc, char* argv[])
         COMPARE(tr->bodyData, string(BODY1));
     }
     
+    {
+      TestRequest* tr = new TestRequest("http://localhost:2000/testLorem");
+      HTTP::Request_ptr own(tr);
+      cl.makeRequest(tr);
+      
+      waitForComplete(&cl, tr);
+      COMPARE(tr->responseCode(), 200);
+      COMPARE(tr->responseReason(), string("OK"));
+      COMPARE(tr->responseLength(), strlen(BODY3));
+      COMPARE(tr->responseBytesReceived(), strlen(BODY3));
+      COMPARE(tr->bodyData, string(BODY3));
+    }
+  
     {
         TestRequest* tr = new TestRequest("http://localhost:2000/test_args?foo=abc&bar=1234&username=johndoe");
         HTTP::Request_ptr own(tr);
@@ -590,6 +632,8 @@ int main(int argc, char* argv[])
     }
     
 // pipelining
+    cout << "testing HTTP 1.1 pipelineing" << endl;
+  
     {
         cl.setProxy("", 80);
         TestRequest* tr = new TestRequest("http://localhost:2000/test1");
@@ -597,7 +641,7 @@ int main(int argc, char* argv[])
         cl.makeRequest(tr);
         
         
-        TestRequest* tr2 = new TestRequest("http://localhost:2000/test1");
+        TestRequest* tr2 = new TestRequest("http://localhost:2000/testLorem");
         HTTP::Request_ptr own2(tr2);
         cl.makeRequest(tr2);
         
@@ -609,7 +653,11 @@ int main(int argc, char* argv[])
         VERIFY(tr->complete);
         VERIFY(tr2->complete);
         COMPARE(tr->bodyData, string(BODY1));
-        COMPARE(tr2->bodyData, string(BODY1));
+      
+        COMPARE(tr2->responseLength(), strlen(BODY3));
+        COMPARE(tr2->responseBytesReceived(), strlen(BODY3));
+        COMPARE(tr2->bodyData, string(BODY3));
+      
         COMPARE(tr3->bodyData, string(BODY1));
     }
     
@@ -633,8 +681,14 @@ int main(int argc, char* argv[])
         waitForComplete(&cl, tr3);
         VERIFY(tr->complete);
         VERIFY(tr2->complete);
+        
+        COMPARE(tr->responseLength(), strlen(BODY1));
+        COMPARE(tr->responseBytesReceived(), strlen(BODY1));
         COMPARE(tr->bodyData, string(BODY1));
-        COMPARE(tr2->bodyData, string(BODY1));
+      
+        COMPARE(tr2->responseLength(), strlen(BODY3));
+        COMPARE(tr2->responseBytesReceived(), strlen(BODY3));
+        COMPARE(tr2->bodyData, string(BODY3));
         COMPARE(tr3->bodyData, string(BODY1));
     }