]> git.mxchange.org Git - simgear.git/blobdiff - simgear/io/HTTPClient.cxx
Introduce SGBinaryFile
[simgear.git] / simgear / io / HTTPClient.cxx
index 3fec10024617b20c726a4c3da4adc13933a59dec..d5ef1ea5108039ff2dd8e01eea27f0141d69be03 100644 (file)
@@ -68,7 +68,6 @@ namespace HTTP
 
 extern const int DEFAULT_HTTP_PORT = 80;
 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
-const unsigned int MAX_INFLIGHT_REQUESTS = 32;
 
 class Connection;
 typedef std::multimap<std::string, Connection*> ConnectionDict;
@@ -79,19 +78,24 @@ class Client::ClientPrivate
 public:
 #if defined(ENABLE_CURL)
     CURLM* curlMulti;
-    bool haveActiveRequests;
 
     void createCurlMulti()
     {
         curlMulti = curl_multi_init();
         // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
         // we request HTTP 1.1 pipelining
-        curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, CURLPIPE_HTTP1);
+        curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
         curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
         curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
-                          (long) MAX_INFLIGHT_REQUESTS);
+                          (long) maxPipelineDepth);
+        curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
+                          (long) maxHostConnections);
+
 
     }
+
+    typedef std::map<Request_ptr, CURL*> RequestCurlMap;
+    RequestCurlMap requests;
 #else
     NetChannelPoller poller;
 // connections by host (potentially more than one)
@@ -103,11 +107,11 @@ public:
     int proxyPort;
     std::string proxyAuth;
     unsigned int maxConnections;
+    unsigned int maxHostConnections;
+    unsigned int maxPipelineDepth;
 
     RequestList pendingRequests;
 
-
-
     SGTimeStamp timeTransferSample;
     unsigned int bytesTransferred;
     unsigned int lastTransferRate;
@@ -122,7 +126,8 @@ public:
         client(pr),
         state(STATE_CLOSED),
         port(DEFAULT_HTTP_PORT),
-        connectionId(conId)
+        _connectionId(conId),
+        _maxPipelineLength(255)
     {
     }
 
@@ -139,7 +144,7 @@ public:
 
       // force the state to GETTING_BODY, to simplify logic in
       // responseComplete and handleClose
-      state = STATE_GETTING_BODY;
+        setState(STATE_GETTING_BODY);
       responseComplete();
     }
 
@@ -149,11 +154,16 @@ public:
         port = p;
     }
 
+    void setMaxPipelineLength(unsigned int m)
+    {
+        _maxPipelineLength = m;
+    }
+
     // socket-level errors
     virtual void handleError(int error)
     {
         const char* errStr = strerror(error);
-        SG_LOG(SG_IO, SG_WARN, "HTTP Connection handleError:" << error << " ("
+        SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
                << errStr << ")");
 
         debugDumpRequests();
@@ -183,7 +193,7 @@ public:
             _contentDecoder.reset();
         }
 
-        state = STATE_SOCKET_ERROR;
+        setState(STATE_SOCKET_ERROR);
     }
 
     void handleTimeout()
@@ -197,20 +207,33 @@ public:
 
     // closing of the connection from the server side when getting the body,
         bool canCloseState = (state == STATE_GETTING_BODY);
+        bool isCancelling = (state == STATE_CANCELLING);
+
         if (canCloseState && activeRequest) {
+            // check bodyTransferSize matches how much we actually transferred
+            if (bodyTransferSize > 0) {
+                if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
+                    SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
+                           << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
+                }
+            }
+
         // force state here, so responseComplete can avoid closing the
         // socket again
-            state =  STATE_CLOSED;
+            SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
+            setState(STATE_CLOSED);
             responseComplete();
         } else {
             if (state == STATE_WAITING_FOR_RESPONSE) {
+                SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
+                       << sentRequests.front()->url());
                 assert(!sentRequests.empty());
                 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
                 // no active request, but don't restore the front sent one
                 sentRequests.erase(sentRequests.begin());
             }
 
-            if (activeRequest) {
+            if (activeRequest && !isCancelling) {
                 activeRequest->setFailure(500, "server closed connection");
                 // remove the failed request from sentRequests, so it does
                 // not get restored
@@ -223,7 +246,7 @@ public:
                 _contentDecoder.reset();
             }
 
-            state = STATE_CLOSED;
+            setState(STATE_CLOSED);
         }
 
       if (sentRequests.empty()) {
@@ -243,6 +266,39 @@ public:
         tryStartNextRequest();
     }
 
+    void cancelRequest(const Request_ptr& r)
+    {
+        RequestList::iterator it = std::find(sentRequests.begin(),
+                                             sentRequests.end(), r);
+        if (it != sentRequests.end()) {
+            sentRequests.erase(it);
+
+            if ((r == activeRequest) || !activeRequest) {
+                // either the cancelling request is active, or we're in waiting
+                // for response state - close now
+                setState(STATE_CANCELLING);
+                close();
+
+                setState(STATE_CLOSED);
+                activeRequest = NULL;
+                _contentDecoder.reset();
+            } else if (activeRequest) {
+                SG_LOG(SG_IO, SG_INFO, "con:" << _connectionId << " cancelling non-active: " << r->url());
+
+                // has been sent but not active, let the active finish and
+                // then close. Otherwise cancelling request #2 would mess up
+                // active transfer #1
+                activeRequest->setCloseAfterComplete();
+            }
+        } // of request has been sent
+
+        // simpler case, not sent yet just remove from the queue
+        it = std::find(queuedRequests.begin(), queuedRequests.end(), r);
+        if (it != queuedRequests.end()) {
+            queuedRequests.erase(it);
+        }
+    }
+
     void beginResponse()
     {
         assert(!sentRequests.empty());
@@ -250,13 +306,14 @@ public:
 
         activeRequest = sentRequests.front();
         try {
+            SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
             activeRequest->responseStart(buffer);
         } catch (sg_exception& e) {
             handleError(EIO);
             return;
         }
 
-      state = STATE_GETTING_HEADERS;
+      setState(STATE_GETTING_HEADERS);
       buffer.clear();
       if (activeRequest->responseCode() == 204) {
         noMessageBody = true;
@@ -282,18 +339,19 @@ public:
         return;
       }
 
-      if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
+      if (sentRequests.size() >= _maxPipelineLength) {
         return;
       }
 
       if (state == STATE_CLOSED) {
           if (!connectToHost()) {
-
+              setState(STATE_SOCKET_ERROR);
               return;
           }
 
+          SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
           setTerminator("\r\n");
-          state = STATE_IDLE;
+          setState(STATE_IDLE);
       }
 
       Request_ptr r = queuedRequests.front();
@@ -373,11 +431,13 @@ public:
           }
         }
 
-        SG_LOG(SG_IO, SG_DEBUG, "con:" << connectionId << " did start request:" << r->url());
+        SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
       // successfully sent, remove from queue, and maybe send the next
       queuedRequests.pop_front();
       sentRequests.push_back(r);
-      state = STATE_WAITING_FOR_RESPONSE;
+      if (state == STATE_IDLE) {
+          setState(STATE_WAITING_FOR_RESPONSE);
+      }
 
       // pipelining, let's maybe send the next request right away
       tryStartNextRequest();
@@ -418,7 +478,7 @@ public:
 
         case STATE_GETTING_CHUNKED_BYTES:
             setTerminator("\r\n");
-            state = STATE_GETTING_CHUNKED;
+            setState(STATE_GETTING_CHUNKED);
             buffer.clear();
             break;
 
@@ -464,7 +524,7 @@ public:
 
     bool shouldStartNext() const
     {
-      return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
+      return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
     }
 
     bool isActive() const
@@ -472,9 +532,14 @@ public:
         return !queuedRequests.empty() || !sentRequests.empty();
     }
 
+    std::string connectionId() const
+    {
+        return _connectionId;
+    }
+
     void debugDumpRequests() const
     {
-        SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << connectionId
+        SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
                << "; state=" << state << ")");
         if (activeRequest) {
             SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
@@ -491,6 +556,28 @@ public:
         }
     }
 private:
+    enum ConnectionState {
+        STATE_IDLE = 0,
+        STATE_WAITING_FOR_RESPONSE,
+        STATE_GETTING_HEADERS,
+        STATE_GETTING_BODY,
+        STATE_GETTING_CHUNKED,
+        STATE_GETTING_CHUNKED_BYTES,
+        STATE_GETTING_TRAILER,
+        STATE_SOCKET_ERROR,
+        STATE_CANCELLING,        ///< cancelling an acitve request
+        STATE_CLOSED             ///< connection should be closed now
+    };
+
+    void setState(ConnectionState newState)
+    {
+        if (state == newState) {
+            return;
+        }
+
+        state = newState;
+    }
+
     bool connectToHost()
     {
         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
@@ -577,11 +664,11 @@ private:
 
         buffer.clear();
         if (chunkSize == 0) {  //  trailer start
-            state = STATE_GETTING_TRAILER;
+            setState(STATE_GETTING_TRAILER);
             return;
         }
 
-        state = STATE_GETTING_CHUNKED_BYTES;
+        setState(STATE_GETTING_CHUNKED_BYTES);
         setByteCount(chunkSize);
     }
 
@@ -602,16 +689,21 @@ private:
         activeRequest->responseHeadersComplete();
         _contentDecoder.initWithRequest(activeRequest);
 
+        if (!activeRequest->serverSupportsPipelining()) {
+            SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it");
+            _maxPipelineLength = 1;
+        }
+
         if (chunkedTransfer) {
-            state = STATE_GETTING_CHUNKED;
+            setState(STATE_GETTING_CHUNKED);
         } else if (noMessageBody || (bodyTransferSize == 0)) {
             // force the state to GETTING_BODY, to simplify logic in
             // responseComplete and handleClose
-            state = STATE_GETTING_BODY;
+            setState(STATE_GETTING_BODY);
             responseComplete();
         } else {
             setByteCount(bodyTransferSize); // may be -1, that's fine
-            state = STATE_GETTING_BODY;
+            setState(STATE_GETTING_BODY);
         }
     }
 
@@ -627,6 +719,7 @@ private:
 
         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
             if (doClose) {
+                SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
           // this will bring us into handleClose() above, which updates
           // state to STATE_CLOSED
               close();
@@ -637,7 +730,7 @@ private:
         }
 
         if (state != STATE_CLOSED)  {
-            state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
+            setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
         }
 
     // notify request after we change state, so this connection is idle
@@ -648,18 +741,6 @@ private:
         setTerminator("\r\n");
     }
 
-    enum ConnectionState {
-        STATE_IDLE = 0,
-        STATE_WAITING_FOR_RESPONSE,
-        STATE_GETTING_HEADERS,
-        STATE_GETTING_BODY,
-        STATE_GETTING_CHUNKED,
-        STATE_GETTING_CHUNKED_BYTES,
-        STATE_GETTING_TRAILER,
-        STATE_SOCKET_ERROR,
-        STATE_CLOSED             ///< connection should be closed now
-    };
-
     Client* client;
     Request_ptr activeRequest;
     ConnectionState state;
@@ -675,7 +756,8 @@ private:
     RequestList sentRequests;
 
     ContentDecoder _contentDecoder;
-    std::string connectionId;
+    std::string _connectionId;
+    unsigned int _maxPipelineLength;
 };
 #endif // of !ENABLE_CURL
 
@@ -684,11 +766,12 @@ Client::Client() :
 {
     d->proxyPort = 0;
     d->maxConnections = 4;
+    d->maxHostConnections = 4;
     d->bytesTransferred = 0;
     d->lastTransferRate = 0;
     d->timeTransferSample.stamp();
     d->totalBytesDownloaded = 0;
-
+    d->maxPipelineDepth = 5;
     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
 #if defined(ENABLE_CURL)
     static bool didInitCurlGlobal = false;
@@ -710,51 +793,78 @@ Client::~Client()
 
 void Client::setMaxConnections(unsigned int maxCon)
 {
-    if (maxCon < 1) {
-        throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
-    }
-
     d->maxConnections = maxCon;
 #if defined(ENABLE_CURL)
     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
 #endif
 }
 
+void Client::setMaxHostConnections(unsigned int maxHostCon)
+{
+    d->maxHostConnections = maxHostCon;
+#if defined(ENABLE_CURL)
+    curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
+#endif
+}
+
+void Client::setMaxPipelineDepth(unsigned int depth)
+{
+    d->maxPipelineDepth = depth;
+#if defined(ENABLE_CURL)
+    curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
+#else
+    ConnectionDict::iterator it = d->connections.begin();
+    for (; it != d->connections.end(); ) {
+        it->second->setMaxPipelineLength(depth);
+    }
+#endif
+}
+
 void Client::update(int waitTimeout)
 {
 #if defined(ENABLE_CURL)
     int remainingActive, messagesInQueue;
     curl_multi_perform(d->curlMulti, &remainingActive);
-    d->haveActiveRequests = (remainingActive > 0);
 
     CURLMsg* msg;
     while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
       if (msg->msg == CURLMSG_DONE) {
-        Request* req;
+        Request* rawReq = 0;
         CURL *e = msg->easy_handle;
-        curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
+        curl_easy_getinfo(e, CURLINFO_PRIVATE, &rawReq);
+
+        // ensure request stays valid for the moment
+        // eg if responseComplete cancels us
+        Request_ptr req(rawReq);
 
         long responseCode;
         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
 
+          // remove from the requests map now,
+          // in case the callbacks perform a cancel. We'll use
+          // the absence from the request dict in cancel to avoid
+          // a double remove
+          ClientPrivate::RequestCurlMap::iterator it = d->requests.find(req);
+          assert(it != d->requests.end());
+          assert(it->second == e);
+          d->requests.erase(it);
+
         if (msg->data.result == 0) {
           req->responseComplete();
         } else {
-          fprintf(stderr, "Result: %d - %s\n",
-                msg->data.result, curl_easy_strerror(msg->data.result));
+          SG_LOG(SG_IO, SG_WARN, "CURL Result:" << msg->data.result << " " << curl_easy_strerror(msg->data.result));
           req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
         }
 
         curl_multi_remove_handle(d->curlMulti, e);
-
-        // balance the reference we take in makeRequest
-        SGReferenced::put(req);
         curl_easy_cleanup(e);
-      }
-      else {
-        SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
+      } else {
+          // should never happen since CURLMSG_DONE is the only code
+          // defined!
+          SG_LOG(SG_IO, SG_ALERT, "unknown CurlMSG:" << msg->msg);
       }
     } // of curl message processing loop
+    SGTimeStamp::sleepForMSec(waitTimeout);
 #else
     if (!d->poller.hasChannels() && (waitTimeout > 0)) {
         SGTimeStamp::sleepForMSec(waitTimeout);
@@ -814,12 +924,17 @@ void Client::makeRequest(const Request_ptr& r)
         return;
     }
 
+    r->_client = this;
+
 #if defined(ENABLE_CURL)
+    ClientPrivate::RequestCurlMap::iterator rit = d->requests.find(r);
+    assert(rit == d->requests.end());
+
     CURL* curlRequest = curl_easy_init();
     curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
 
-    // manually increase the ref count of the request
-    SGReferenced::get(r.get());
+    d->requests[r] = curlRequest;
+
     curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
     // disable built-in libCurl progress feedback
     curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
@@ -885,9 +1000,9 @@ void Client::makeRequest(const Request_ptr& r)
     }
 
     curl_multi_add_handle(d->curlMulti, curlRequest);
-    d->haveActiveRequests = true;
 
-// FIXME - premature?
+// this seems premature, but we don't have a callback from Curl we could
+// use to trigger when the requst is actually sent.
     r->requestStart();
 
 #else
@@ -936,29 +1051,66 @@ void Client::makeRequest(const Request_ptr& r)
         }
     }
 
-    if (!con && atConnectionsLimit) {
+    bool atHostConnectionsLimit = (count >= d->maxHostConnections);
+
+    if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
         // all current connections are busy (active), and we don't
         // have free connections to allocate, so let's assign to
         // an existing one randomly. Ideally we'd used whichever one will
         // complete first but we don't have that info.
         int index = rand() % count;
-        for (it = d->connections.find(connectionId); index > 0; --index) { ; }
+        for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
         con = it->second;
     }
 
     // allocate a new connection object
     if (!con) {
-        con = new Connection(this, connectionId);
+        static int connectionSuffx = 0;
+
+        std::stringstream ss;
+        ss << connectionId << "-" << connectionSuffx++;
+
+        SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
+        con = new Connection(this, ss.str());
         con->setServer(host, port);
+        con->setMaxPipelineLength(d->maxPipelineDepth);
         d->poller.addChannel(con);
         d->connections.insert(d->connections.end(),
             ConnectionDict::value_type(connectionId, con));
     }
 
+    SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
     con->queueRequest(r);
 #endif
 }
 
+void Client::cancelRequest(const Request_ptr &r, std::string reason)
+{
+#if defined(ENABLE_CURL)
+    ClientPrivate::RequestCurlMap::iterator it = d->requests.find(r);
+    if(it == d->requests.end()) {
+        // already being removed, presumably inside ::update()
+        // nothing more to do
+        return;
+    }
+
+    CURLMcode err = curl_multi_remove_handle(d->curlMulti, it->second);
+    assert(err == CURLM_OK);
+
+    // clear the request pointer form the curl-easy object
+    curl_easy_setopt(it->second, CURLOPT_PRIVATE, 0);
+
+    curl_easy_cleanup(it->second);
+    d->requests.erase(it);
+#else
+    ConnectionDict::iterator it = d->connections.begin();
+    for (; it != d->connections.end(); ++it) {
+        (it->second)->cancelRequest(r);
+    }
+#endif
+    r->setFailure(-1, reason);
+}
+
 //------------------------------------------------------------------------------
 FileRequestRef Client::save( const std::string& url,
                              const std::string& filename )
@@ -1013,7 +1165,7 @@ void Client::setProxy( const std::string& proxy,
 bool Client::hasActiveRequests() const
 {
   #if defined(ENABLE_CURL)
-    return d->haveActiveRequests;
+    return !d->requests.empty();
   #else
     ConnectionDict::const_iterator it = d->connections.begin();
     for (; it != d->connections.end(); ++it) {
@@ -1064,9 +1216,14 @@ uint64_t Client::totalBytesDownloaded() const
 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
 {
   size_t byteSize = size * nmemb;
-
   Request* req = static_cast<Request*>(userdata);
   req->processBodyBytes(ptr, byteSize);
+
+  Client* cl = req->http();
+  if (cl) {
+    cl->receivedBytes(byteSize);
+  }
+
   return byteSize;
 }
 
@@ -1121,7 +1278,12 @@ size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems
 void Client::debugDumpRequests()
 {
 #if defined(ENABLE_CURL)
-
+    SG_LOG(SG_IO, SG_INFO, "== HTTP request dump");
+    ClientPrivate::RequestCurlMap::iterator it = d->requests.begin();
+    for (; it != d->requests.end(); ++it) {
+        SG_LOG(SG_IO, SG_INFO, "\t" << it->first->url());
+    }
+    SG_LOG(SG_IO, SG_INFO, "==");
 #else
     SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
     ConnectionDict::iterator it = d->connections.begin();