public:
#if defined(ENABLE_CURL)
CURLM* curlMulti;
- bool haveActiveRequests;
void createCurlMulti()
{
}
+
+ typedef std::map<Request_ptr, CURL*> RequestCurlMap;
+ RequestCurlMap requests;
#else
NetChannelPoller poller;
// connections by host (potentially more than one)
// closing of the connection from the server side when getting the body,
bool canCloseState = (state == STATE_GETTING_BODY);
+ bool isCancelling = (state == STATE_CANCELLING);
+
if (canCloseState && activeRequest) {
// check bodyTransferSize matches how much we actually transferred
if (bodyTransferSize > 0) {
sentRequests.erase(sentRequests.begin());
}
- if (activeRequest) {
+ if (activeRequest && !isCancelling) {
activeRequest->setFailure(500, "server closed connection");
// remove the failed request from sentRequests, so it does
// not get restored
tryStartNextRequest();
}
+ void cancelRequest(const Request_ptr& r)
+ {
+ RequestList::iterator it = std::find(sentRequests.begin(),
+ sentRequests.end(), r);
+ if (it != sentRequests.end()) {
+ sentRequests.erase(it);
+
+ if ((r == activeRequest) || !activeRequest) {
+ // either the cancelling request is active, or we're in waiting
+ // for response state - close now
+ setState(STATE_CANCELLING);
+ close();
+
+ setState(STATE_CLOSED);
+ activeRequest = NULL;
+ _contentDecoder.reset();
+ } else if (activeRequest) {
+ SG_LOG(SG_IO, SG_INFO, "con:" << _connectionId << " cancelling non-active: " << r->url());
+
+ // has been sent but not active, let the active finish and
+ // then close. Otherwise cancelling request #2 would mess up
+ // active transfer #1
+ activeRequest->setCloseAfterComplete();
+ }
+ } // of request has been sent
+
+ // simpler case, not sent yet just remove from the queue
+ it = std::find(queuedRequests.begin(), queuedRequests.end(), r);
+ if (it != queuedRequests.end()) {
+ queuedRequests.erase(it);
+ }
+ }
+
void beginResponse()
{
assert(!sentRequests.empty());
STATE_GETTING_CHUNKED_BYTES,
STATE_GETTING_TRAILER,
STATE_SOCKET_ERROR,
+ STATE_CANCELLING, ///< cancelling an acitve request
STATE_CLOSED ///< connection should be closed now
};
#if defined(ENABLE_CURL)
int remainingActive, messagesInQueue;
curl_multi_perform(d->curlMulti, &remainingActive);
- d->haveActiveRequests = (remainingActive > 0);
CURLMsg* msg;
while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
if (msg->msg == CURLMSG_DONE) {
- Request* req;
+ Request* rawReq = 0;
CURL *e = msg->easy_handle;
- curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
+ curl_easy_getinfo(e, CURLINFO_PRIVATE, &rawReq);
+
+ // ensure request stays valid for the moment
+ // eg if responseComplete cancels us
+ Request_ptr req(rawReq);
long responseCode;
curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
+ // remove from the requests map now,
+ // in case the callbacks perform a cancel. We'll use
+ // the absence from the request dict in cancel to avoid
+ // a double remove
+ ClientPrivate::RequestCurlMap::iterator it = d->requests.find(req);
+ assert(it != d->requests.end());
+ assert(it->second == e);
+ d->requests.erase(it);
+
if (msg->data.result == 0) {
req->responseComplete();
} else {
- fprintf(stderr, "Result: %d - %s\n",
- msg->data.result, curl_easy_strerror(msg->data.result));
+ SG_LOG(SG_IO, SG_WARN, "CURL Result:" << msg->data.result << " " << curl_easy_strerror(msg->data.result));
req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
}
curl_multi_remove_handle(d->curlMulti, e);
-
- // balance the reference we take in makeRequest
- SGReferenced::put(req);
curl_easy_cleanup(e);
- }
- else {
- SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
+ } else {
+ // should never happen since CURLMSG_DONE is the only code
+ // defined!
+ SG_LOG(SG_IO, SG_ALERT, "unknown CurlMSG:" << msg->msg);
}
} // of curl message processing loop
SGTimeStamp::sleepForMSec(waitTimeout);
r->_client = this;
#if defined(ENABLE_CURL)
+ ClientPrivate::RequestCurlMap::iterator rit = d->requests.find(r);
+ assert(rit == d->requests.end());
+
CURL* curlRequest = curl_easy_init();
curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
- // manually increase the ref count of the request
- SGReferenced::get(r.get());
+ d->requests[r] = curlRequest;
+
curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
// disable built-in libCurl progress feedback
curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
}
curl_multi_add_handle(d->curlMulti, curlRequest);
- d->haveActiveRequests = true;
-// FIXME - premature?
+// this seems premature, but we don't have a callback from Curl we could
+// use to trigger when the requst is actually sent.
r->requestStart();
#else
#endif
}
+void Client::cancelRequest(const Request_ptr &r, std::string reason)
+{
+#if defined(ENABLE_CURL)
+ ClientPrivate::RequestCurlMap::iterator it = d->requests.find(r);
+ if(it == d->requests.end()) {
+ // already being removed, presumably inside ::update()
+ // nothing more to do
+ return;
+ }
+
+ CURLMcode err = curl_multi_remove_handle(d->curlMulti, it->second);
+ assert(err == CURLM_OK);
+
+ // clear the request pointer form the curl-easy object
+ curl_easy_setopt(it->second, CURLOPT_PRIVATE, 0);
+
+ curl_easy_cleanup(it->second);
+ d->requests.erase(it);
+#else
+ ConnectionDict::iterator it = d->connections.begin();
+ for (; it != d->connections.end(); ++it) {
+ (it->second)->cancelRequest(r);
+ }
+#endif
+ r->setFailure(-1, reason);
+}
+
//------------------------------------------------------------------------------
FileRequestRef Client::save( const std::string& url,
const std::string& filename )
bool Client::hasActiveRequests() const
{
#if defined(ENABLE_CURL)
- return d->haveActiveRequests;
+ return !d->requests.empty();
#else
ConnectionDict::const_iterator it = d->connections.begin();
for (; it != d->connections.end(); ++it) {
void Client::debugDumpRequests()
{
#if defined(ENABLE_CURL)
-
+ SG_LOG(SG_IO, SG_INFO, "== HTTP request dump");
+ ClientPrivate::RequestCurlMap::iterator it = d->requests.begin();
+ for (; it != d->requests.end(); ++it) {
+ SG_LOG(SG_IO, SG_INFO, "\t" << it->first->url());
+ }
+ SG_LOG(SG_IO, SG_INFO, "==");
#else
SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
ConnectionDict::iterator it = d->connections.begin();
COMPARE(tr->responseBytesReceived(), 0);
}
+ // test cancel
+ {
+ cout << "cancel request" << endl;
+ testServer.resetConnectCount();
+ cl.clearAllConnections();
+
+ cl.setProxy("", 80);
+ TestRequest* tr = new TestRequest("http://localhost:2000/test1");
+ HTTP::Request_ptr own(tr);
+ cl.makeRequest(tr);
+
+ TestRequest* tr2 = new TestRequest("http://localhost:2000/testLorem");
+ HTTP::Request_ptr own2(tr2);
+ cl.makeRequest(tr2);
+
+ TestRequest* tr3 = new TestRequest("http://localhost:2000/test1");
+ HTTP::Request_ptr own3(tr3);
+ cl.makeRequest(tr3);
+
+ cl.cancelRequest(tr, "my reason 1");
+
+ cl.cancelRequest(tr2, "my reason 2");
+
+ waitForComplete(&cl, tr3);
+
+ COMPARE(tr->responseCode(), -1);
+ COMPARE(tr2->responseReason(), "my reason 2");
+
+ COMPARE(tr3->responseLength(), strlen(BODY1));
+ COMPARE(tr3->responseBytesReceived(), strlen(BODY1));
+ COMPARE(tr3->bodyData, string(BODY1));
+ }
+
+ // test cancel
+ {
+ cout << "cancel middle request" << endl;
+ testServer.resetConnectCount();
+ cl.clearAllConnections();
+
+ cl.setProxy("", 80);
+ TestRequest* tr = new TestRequest("http://localhost:2000/test1");
+ HTTP::Request_ptr own(tr);
+ cl.makeRequest(tr);
+
+ TestRequest* tr2 = new TestRequest("http://localhost:2000/testLorem");
+ HTTP::Request_ptr own2(tr2);
+ cl.makeRequest(tr2);
+
+ TestRequest* tr3 = new TestRequest("http://localhost:2000/test1");
+ HTTP::Request_ptr own3(tr3);
+ cl.makeRequest(tr3);
+
+ cl.cancelRequest(tr2, "middle request");
+
+ waitForComplete(&cl, tr3);
+
+ COMPARE(tr->responseCode(), 200);
+ COMPARE(tr->responseLength(), strlen(BODY1));
+ COMPARE(tr->responseBytesReceived(), strlen(BODY1));
+ COMPARE(tr->bodyData, string(BODY1));
+
+ COMPARE(tr2->responseCode(), -1);
+
+ COMPARE(tr3->responseLength(), strlen(BODY1));
+ COMPARE(tr3->responseBytesReceived(), strlen(BODY1));
+ COMPARE(tr3->bodyData, string(BODY1));
+ }
+
{
cout << "get-during-response-send" << endl;
cl.clearAllConnections();