#include <simgear/simgear_config.h>
-#if defined(ENABLE_CURL)
- #include <curl/multi.h>
-#else
- #include <simgear/io/HTTPContentDecode.hxx>
-#endif
+#include <curl/multi.h>
#include <simgear/io/sg_netChat.hxx>
class Client::ClientPrivate
{
public:
-#if defined(ENABLE_CURL)
CURLM* curlMulti;
- bool haveActiveRequests;
void createCurlMulti()
{
// see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
// we request HTTP 1.1 pipelining
curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
+#if (LIBCURL_VERSION_MINOR >= 30)
curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
(long) maxPipelineDepth);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
(long) maxHostConnections);
-
-
- }
-#else
- NetChannelPoller poller;
-// connections by host (potentially more than one)
- ConnectionDict connections;
#endif
+ }
+
+ typedef std::map<Request_ptr, CURL*> RequestCurlMap;
+ RequestCurlMap requests;
std::string userAgent;
std::string proxy;
uint64_t totalBytesDownloaded;
};
-#if !defined(ENABLE_CURL)
-class Connection : public NetChat
-{
-public:
- Connection(Client* pr, const std::string& conId) :
- client(pr),
- state(STATE_CLOSED),
- port(DEFAULT_HTTP_PORT),
- _connectionId(conId),
- _maxPipelineLength(255)
- {
- }
-
- virtual ~Connection()
- {
- }
-
- virtual void handleBufferRead (NetBuffer& buffer)
- {
- if( !activeRequest || !activeRequest->isComplete() )
- return NetChat::handleBufferRead(buffer);
-
- // Request should be aborted (signaled by setting its state to complete).
-
- // force the state to GETTING_BODY, to simplify logic in
- // responseComplete and handleClose
- setState(STATE_GETTING_BODY);
- responseComplete();
- }
-
- void setServer(const std::string& h, short p)
- {
- host = h;
- port = p;
- }
-
- void setMaxPipelineLength(unsigned int m)
- {
- _maxPipelineLength = m;
- }
-
- // socket-level errors
- virtual void handleError(int error)
- {
- const char* errStr = strerror(error);
- SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
- << errStr << ")");
-
- debugDumpRequests();
-
- if (!activeRequest)
- {
- // connection level failure, eg name lookup or routing
- // we won't have an active request yet, so let's fail all of the
- // requests since we presume it's a systematic failure for
- // the host in question
- BOOST_FOREACH(Request_ptr req, sentRequests) {
- req->setFailure(error, errStr);
- }
-
- BOOST_FOREACH(Request_ptr req, queuedRequests) {
- req->setFailure(error, errStr);
- }
-
- sentRequests.clear();
- queuedRequests.clear();
- }
-
- NetChat::handleError(error);
- if (activeRequest) {
- activeRequest->setFailure(error, errStr);
- activeRequest = NULL;
- _contentDecoder.reset();
- }
-
- setState(STATE_SOCKET_ERROR);
- }
-
- void handleTimeout()
- {
- handleError(ETIMEDOUT);
- }
-
- virtual void handleClose()
- {
- NetChat::handleClose();
-
- // closing of the connection from the server side when getting the body,
- bool canCloseState = (state == STATE_GETTING_BODY);
- if (canCloseState && activeRequest) {
- // check bodyTransferSize matches how much we actually transferred
- if (bodyTransferSize > 0) {
- if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
- SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
- << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
- }
- }
-
- // force state here, so responseComplete can avoid closing the
- // socket again
- SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
- setState(STATE_CLOSED);
- responseComplete();
- } else {
- if (state == STATE_WAITING_FOR_RESPONSE) {
- SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
- << sentRequests.front()->url());
- assert(!sentRequests.empty());
- sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
- // no active request, but don't restore the front sent one
- sentRequests.erase(sentRequests.begin());
- }
-
- if (activeRequest) {
- activeRequest->setFailure(500, "server closed connection");
- // remove the failed request from sentRequests, so it does
- // not get restored
- RequestList::iterator it = std::find(sentRequests.begin(),
- sentRequests.end(), activeRequest);
- if (it != sentRequests.end()) {
- sentRequests.erase(it);
- }
- activeRequest = NULL;
- _contentDecoder.reset();
- }
-
- setState(STATE_CLOSED);
- }
-
- if (sentRequests.empty()) {
- return;
- }
-
- // restore sent requests to the queue, so they will be re-sent
- // when the connection opens again
- queuedRequests.insert(queuedRequests.begin(),
- sentRequests.begin(), sentRequests.end());
- sentRequests.clear();
- }
-
- void queueRequest(const Request_ptr& r)
- {
- queuedRequests.push_back(r);
- tryStartNextRequest();
- }
-
- void beginResponse()
- {
- assert(!sentRequests.empty());
- assert(state == STATE_WAITING_FOR_RESPONSE);
-
- activeRequest = sentRequests.front();
- try {
- SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
- activeRequest->responseStart(buffer);
- } catch (sg_exception& e) {
- handleError(EIO);
- return;
- }
-
- setState(STATE_GETTING_HEADERS);
- buffer.clear();
- if (activeRequest->responseCode() == 204) {
- noMessageBody = true;
- } else if (activeRequest->method() == "HEAD") {
- noMessageBody = true;
- } else {
- noMessageBody = false;
- }
-
- bodyTransferSize = -1;
- chunkedTransfer = false;
- _contentDecoder.reset();
- }
-
- void tryStartNextRequest()
- {
- while( !queuedRequests.empty()
- && queuedRequests.front()->isComplete() )
- queuedRequests.pop_front();
-
- if (queuedRequests.empty()) {
- idleTime.stamp();
- return;
- }
-
- if (sentRequests.size() >= _maxPipelineLength) {
- return;
- }
-
- if (state == STATE_CLOSED) {
- if (!connectToHost()) {
- setState(STATE_SOCKET_ERROR);
- return;
- }
-
- SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
- setTerminator("\r\n");
- setState(STATE_IDLE);
- }
-
- Request_ptr r = queuedRequests.front();
- r->requestStart();
-
- std::stringstream headerData;
- std::string path = r->path();
- assert(!path.empty());
- std::string query = r->query();
- std::string bodyData;
-
- if (!client->proxyHost().empty()) {
- path = r->scheme() + "://" + r->host() + r->path();
- }
-
- if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
- headerData << r->method() << " " << path << " HTTP/1.1\r\n";
- bodyData = query.substr(1); // URL-encode, drop the leading '?'
- headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
- headerData << "Content-Length:" << bodyData.size() << "\r\n";
- } else {
- headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
- if( r->hasBodyData() )
- {
- headerData << "Content-Length:" << r->bodyLength() << "\r\n";
- headerData << "Content-Type:" << r->bodyType() << "\r\n";
- }
- }
-
- headerData << "Host: " << r->hostAndPort() << "\r\n";
- headerData << "User-Agent:" << client->userAgent() << "\r\n";
- headerData << "Accept-Encoding: deflate, gzip\r\n";
- if (!client->proxyAuth().empty()) {
- headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
- }
-
- BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
- headerData << h.first << ": " << h.second << "\r\n";
- }
-
- headerData << "\r\n"; // final CRLF to terminate the headers
- if (!bodyData.empty()) {
- headerData << bodyData;
- }
-
- bool ok = push(headerData.str().c_str());
- if (!ok) {
- SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
- // we've over-stuffed the socket, give up for now, let things
- // drain down before trying to start any more requests.
- return;
- }
-
- if( r->hasBodyData() )
- for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
- {
- char buf[4096];
- size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
- if( len )
- {
- if( !bufferSend(buf, len) )
- {
- SG_LOG(SG_IO,
- SG_WARN,
- "overflow the HTTP::Connection output buffer");
- state = STATE_SOCKET_ERROR;
- return;
- }
- body_bytes_sent += len;
- }
- else
- {
- SG_LOG(SG_IO,
- SG_WARN,
- "HTTP asynchronous request body generation is unsupported");
- break;
- }
- }
-
- SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
- // successfully sent, remove from queue, and maybe send the next
- queuedRequests.pop_front();
- sentRequests.push_back(r);
- if (state == STATE_IDLE) {
- setState(STATE_WAITING_FOR_RESPONSE);
- }
-
- // pipelining, let's maybe send the next request right away
- tryStartNextRequest();
- }
-
- virtual void collectIncomingData(const char* s, int n)
- {
- idleTime.stamp();
- client->receivedBytes(static_cast<unsigned int>(n));
-
- if( (state == STATE_GETTING_BODY)
- || (state == STATE_GETTING_CHUNKED_BYTES) )
- _contentDecoder.receivedBytes(s, n);
- else
- buffer.append(s, n);
- }
-
- virtual void foundTerminator(void)
- {
- idleTime.stamp();
- switch (state) {
- case STATE_WAITING_FOR_RESPONSE:
- beginResponse();
- break;
-
- case STATE_GETTING_HEADERS:
- processHeader();
- buffer.clear();
- break;
-
- case STATE_GETTING_BODY:
- responseComplete();
- break;
-
- case STATE_GETTING_CHUNKED:
- processChunkHeader();
- break;
-
- case STATE_GETTING_CHUNKED_BYTES:
- setTerminator("\r\n");
- setState(STATE_GETTING_CHUNKED);
- buffer.clear();
- break;
-
-
- case STATE_GETTING_TRAILER:
- processTrailer();
- buffer.clear();
- break;
-
- case STATE_IDLE:
- SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
-
- default:
- break;
- }
- }
-
- bool hasIdleTimeout() const
- {
- if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
- return false;
- }
-
- assert(sentRequests.empty());
- bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
- return isTimedOut;
- }
-
- bool hasErrorTimeout() const
- {
- if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
- return false;
- }
-
- bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
- return isTimedOut;
- }
-
- bool hasError() const
- {
- return (state == STATE_SOCKET_ERROR);
- }
-
- bool shouldStartNext() const
- {
- return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
- }
-
- bool isActive() const
- {
- return !queuedRequests.empty() || !sentRequests.empty();
- }
-
- std::string connectionId() const
- {
- return _connectionId;
- }
-
- void debugDumpRequests() const
- {
- SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
- << "; state=" << state << ")");
- if (activeRequest) {
- SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
- } else {
- SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
- }
-
- BOOST_FOREACH(Request_ptr req, sentRequests) {
- SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
- }
-
- BOOST_FOREACH(Request_ptr req, queuedRequests) {
- SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
- }
- }
-private:
- enum ConnectionState {
- STATE_IDLE = 0,
- STATE_WAITING_FOR_RESPONSE,
- STATE_GETTING_HEADERS,
- STATE_GETTING_BODY,
- STATE_GETTING_CHUNKED,
- STATE_GETTING_CHUNKED_BYTES,
- STATE_GETTING_TRAILER,
- STATE_SOCKET_ERROR,
- STATE_CLOSED ///< connection should be closed now
- };
-
- void setState(ConnectionState newState)
- {
- if (state == newState) {
- return;
- }
-
- state = newState;
- }
-
- bool connectToHost()
- {
- SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
-
- if (!open()) {
- SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
- return false;
- }
-
- if (connect(host.c_str(), port) != 0) {
- SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
- return false;
- }
-
- return true;
- }
-
-
- void processHeader()
- {
- std::string h = strutils::simplify(buffer);
- if (h.empty()) { // blank line terminates headers
- headersComplete();
- return;
- }
-
- int colonPos = buffer.find(':');
- if (colonPos < 0) {
- SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
- return;
- }
-
- std::string key = strutils::simplify(buffer.substr(0, colonPos));
- std::string lkey = boost::to_lower_copy(key);
- std::string value = strutils::strip(buffer.substr(colonPos + 1));
-
- // only consider these if getting headers (as opposed to trailers
- // of a chunked transfer)
- if (state == STATE_GETTING_HEADERS) {
- if (lkey == "content-length") {
-
- int sz = strutils::to_int(value);
- if (bodyTransferSize <= 0) {
- bodyTransferSize = sz;
- }
- activeRequest->setResponseLength(sz);
- } else if (lkey == "transfer-length") {
- bodyTransferSize = strutils::to_int(value);
- } else if (lkey == "transfer-encoding") {
- processTransferEncoding(value);
- } else if (lkey == "content-encoding") {
- _contentDecoder.setEncoding(value);
- }
- }
-
- activeRequest->responseHeader(lkey, value);
- }
-
- void processTransferEncoding(const std::string& te)
- {
- if (te == "chunked") {
- chunkedTransfer = true;
- } else {
- SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
- // failure
- }
- }
-
- void processChunkHeader()
- {
- if (buffer.empty()) {
- // blank line after chunk data
- return;
- }
-
- int chunkSize = 0;
- int semiPos = buffer.find(';');
- if (semiPos >= 0) {
- // extensions ignored for the moment
- chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
- } else {
- chunkSize = strutils::to_int(buffer, 16);
- }
-
- buffer.clear();
- if (chunkSize == 0) { // trailer start
- setState(STATE_GETTING_TRAILER);
- return;
- }
-
- setState(STATE_GETTING_CHUNKED_BYTES);
- setByteCount(chunkSize);
- }
-
- void processTrailer()
- {
- if (buffer.empty()) {
- // end of trailers
- responseComplete();
- return;
- }
-
- // process as a normal header
- processHeader();
- }
-
- void headersComplete()
- {
- activeRequest->responseHeadersComplete();
- _contentDecoder.initWithRequest(activeRequest);
-
- if (!activeRequest->serverSupportsPipelining()) {
- SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it");
- _maxPipelineLength = 1;
- }
-
- if (chunkedTransfer) {
- setState(STATE_GETTING_CHUNKED);
- } else if (noMessageBody || (bodyTransferSize == 0)) {
- // force the state to GETTING_BODY, to simplify logic in
- // responseComplete and handleClose
- setState(STATE_GETTING_BODY);
- responseComplete();
- } else {
- setByteCount(bodyTransferSize); // may be -1, that's fine
- setState(STATE_GETTING_BODY);
- }
- }
-
- void responseComplete()
- {
- Request_ptr completedRequest = activeRequest;
- _contentDecoder.finish();
-
- assert(sentRequests.front() == activeRequest);
- sentRequests.pop_front();
- bool doClose = activeRequest->closeAfterComplete();
- activeRequest = NULL;
-
- if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
- if (doClose) {
- SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
- // this will bring us into handleClose() above, which updates
- // state to STATE_CLOSED
- close();
-
- // if we have additional requests waiting, try to start them now
- tryStartNextRequest();
- }
- }
-
- if (state != STATE_CLOSED) {
- setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
- }
-
- // notify request after we change state, so this connection is idle
- // if completion triggers other requests (which is likely)
- completedRequest->responseComplete();
- client->requestFinished(this);
-
- setTerminator("\r\n");
- }
-
- Client* client;
- Request_ptr activeRequest;
- ConnectionState state;
- std::string host;
- short port;
- std::string buffer;
- int bodyTransferSize;
- SGTimeStamp idleTime;
- bool chunkedTransfer;
- bool noMessageBody;
-
- RequestList queuedRequests;
- RequestList sentRequests;
-
- ContentDecoder _contentDecoder;
- std::string _connectionId;
- unsigned int _maxPipelineLength;
-};
-#endif // of !ENABLE_CURL
-
Client::Client() :
d(new ClientPrivate)
{
d->totalBytesDownloaded = 0;
d->maxPipelineDepth = 5;
setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
-#if defined(ENABLE_CURL)
+
static bool didInitCurlGlobal = false;
if (!didInitCurlGlobal) {
curl_global_init(CURL_GLOBAL_ALL);
}
d->createCurlMulti();
-#endif
}
Client::~Client()
{
-#if defined(ENABLE_CURL)
curl_multi_cleanup(d->curlMulti);
-#endif
}
void Client::setMaxConnections(unsigned int maxCon)
{
d->maxConnections = maxCon;
-#if defined(ENABLE_CURL)
+#if (LIBCURL_VERSION_MINOR >= 30)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
#endif
}
void Client::setMaxHostConnections(unsigned int maxHostCon)
{
d->maxHostConnections = maxHostCon;
-#if defined(ENABLE_CURL)
+#if (LIBCURL_VERSION_MINOR >= 30)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
#endif
}
void Client::setMaxPipelineDepth(unsigned int depth)
{
d->maxPipelineDepth = depth;
-#if defined(ENABLE_CURL)
+#if (LIBCURL_VERSION_MINOR >= 30)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
-#else
- ConnectionDict::iterator it = d->connections.begin();
- for (; it != d->connections.end(); ) {
- it->second->setMaxPipelineLength(depth);
- }
#endif
}
void Client::update(int waitTimeout)
{
-#if defined(ENABLE_CURL)
int remainingActive, messagesInQueue;
curl_multi_perform(d->curlMulti, &remainingActive);
- d->haveActiveRequests = (remainingActive > 0);
CURLMsg* msg;
while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
if (msg->msg == CURLMSG_DONE) {
- Request* req;
+ Request* rawReq = 0;
CURL *e = msg->easy_handle;
- curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
+ curl_easy_getinfo(e, CURLINFO_PRIVATE, &rawReq);
+
+ // ensure request stays valid for the moment
+ // eg if responseComplete cancels us
+ Request_ptr req(rawReq);
long responseCode;
curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
+ // remove from the requests map now,
+ // in case the callbacks perform a cancel. We'll use
+ // the absence from the request dict in cancel to avoid
+ // a double remove
+ ClientPrivate::RequestCurlMap::iterator it = d->requests.find(req);
+ assert(it != d->requests.end());
+ assert(it->second == e);
+ d->requests.erase(it);
+
if (msg->data.result == 0) {
req->responseComplete();
} else {
- fprintf(stderr, "Result: %d - %s\n",
- msg->data.result, curl_easy_strerror(msg->data.result));
+ SG_LOG(SG_IO, SG_WARN, "CURL Result:" << msg->data.result << " " << curl_easy_strerror(msg->data.result));
req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
}
curl_multi_remove_handle(d->curlMulti, e);
-
- // balance the reference we take in makeRequest
- SGReferenced::put(req);
curl_easy_cleanup(e);
- }
- else {
- SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
+ } else {
+ // should never happen since CURLMSG_DONE is the only code
+ // defined!
+ SG_LOG(SG_IO, SG_ALERT, "unknown CurlMSG:" << msg->msg);
}
} // of curl message processing loop
SGTimeStamp::sleepForMSec(waitTimeout);
-#else
- if (!d->poller.hasChannels() && (waitTimeout > 0)) {
- SGTimeStamp::sleepForMSec(waitTimeout);
- } else {
- d->poller.poll(waitTimeout);
- }
-
- bool waitingRequests = !d->pendingRequests.empty();
- ConnectionDict::iterator it = d->connections.begin();
- for (; it != d->connections.end(); ) {
- Connection* con = it->second;
- if (con->hasIdleTimeout() ||
- con->hasError() ||
- con->hasErrorTimeout() ||
- (!con->isActive() && waitingRequests))
- {
- if (con->hasErrorTimeout()) {
- // tell the connection we're timing it out
- con->handleTimeout();
- }
-
- // connection has been idle for a while, clean it up
- // (or if we have requests waiting for a different host,
- // or an error condition
- ConnectionDict::iterator del = it++;
- delete del->second;
- d->connections.erase(del);
- } else {
- if (it->second->shouldStartNext()) {
- it->second->tryStartNextRequest();
- }
- ++it;
- }
- } // of connection iteration
-
- if (waitingRequests && (d->connections.size() < d->maxConnections)) {
- RequestList waiting(d->pendingRequests);
- d->pendingRequests.clear();
-
- // re-submit all waiting requests in order; this takes care of
- // finding multiple pending items targetted to the same (new)
- // connection
- BOOST_FOREACH(Request_ptr req, waiting) {
- makeRequest(req);
- }
- }
-#endif
}
void Client::makeRequest(const Request_ptr& r)
r->_client = this;
-#if defined(ENABLE_CURL)
+ ClientPrivate::RequestCurlMap::iterator rit = d->requests.find(r);
+ assert(rit == d->requests.end());
+
CURL* curlRequest = curl_easy_init();
curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
- // manually increase the ref count of the request
- SGReferenced::get(r.get());
+ d->requests[r] = curlRequest;
+
curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
// disable built-in libCurl progress feedback
curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
}
curl_multi_add_handle(d->curlMulti, curlRequest);
- d->haveActiveRequests = true;
-// FIXME - premature?
+// this seems premature, but we don't have a callback from Curl we could
+// use to trigger when the requst is actually sent.
r->requestStart();
+}
-#else
- if( r->url().find("http://") != 0 ) {
- r->setFailure(EINVAL, "only HTTP protocol is supported");
- return;
- }
-
- std::string host = r->host();
- int port = r->port();
- if (!d->proxy.empty()) {
- host = d->proxy;
- port = d->proxyPort;
- }
-
- Connection* con = NULL;
- std::stringstream ss;
- ss << host << "-" << port;
- std::string connectionId = ss.str();
- bool havePending = !d->pendingRequests.empty();
- bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
- ConnectionDict::iterator consEnd = d->connections.end();
-
- // assign request to an existing Connection.
- // various options exist here, examined in order
- ConnectionDict::iterator it = d->connections.find(connectionId);
- if (atConnectionsLimit && (it == consEnd)) {
- // maximum number of connections active, queue this request
- // when a connection goes inactive, we'll start this one
- d->pendingRequests.push_back(r);
+void Client::cancelRequest(const Request_ptr &r, std::string reason)
+{
+ ClientPrivate::RequestCurlMap::iterator it = d->requests.find(r);
+ if(it == d->requests.end()) {
+ // already being removed, presumably inside ::update()
+ // nothing more to do
return;
}
- // scan for an idle Connection to the same host (likely if we're
- // retrieving multiple resources from the same host in quick succession)
- // if we have pending requests (waiting for a free Connection), then
- // force new requests on this id to always use the first Connection
- // (instead of the random selection below). This ensures that when
- // there's pressure on the number of connections to keep alive, one
- // host can't DoS every other.
- int count = 0;
- for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
- if (havePending || !it->second->isActive()) {
- con = it->second;
- break;
- }
- }
+ CURLMcode err = curl_multi_remove_handle(d->curlMulti, it->second);
+ assert(err == CURLM_OK);
- bool atHostConnectionsLimit = (count >= d->maxHostConnections);
+ // clear the request pointer form the curl-easy object
+ curl_easy_setopt(it->second, CURLOPT_PRIVATE, 0);
- if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
- // all current connections are busy (active), and we don't
- // have free connections to allocate, so let's assign to
- // an existing one randomly. Ideally we'd used whichever one will
- // complete first but we don't have that info.
- int index = rand() % count;
- for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
- con = it->second;
- }
+ curl_easy_cleanup(it->second);
+ d->requests.erase(it);
- // allocate a new connection object
- if (!con) {
- static int connectionSuffx = 0;
-
- std::stringstream ss;
- ss << connectionId << "-" << connectionSuffx++;
-
- SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
- con = new Connection(this, ss.str());
- con->setServer(host, port);
- con->setMaxPipelineLength(d->maxPipelineDepth);
- d->poller.addChannel(con);
- d->connections.insert(d->connections.end(),
- ConnectionDict::value_type(connectionId, con));
- }
-
- SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
- con->queueRequest(r);
-#endif
+ r->setFailure(-1, reason);
}
//------------------------------------------------------------------------------
bool Client::hasActiveRequests() const
{
- #if defined(ENABLE_CURL)
- return d->haveActiveRequests;
- #else
- ConnectionDict::const_iterator it = d->connections.begin();
- for (; it != d->connections.end(); ++it) {
- if (it->second->isActive()) return true;
- }
-
- return false;
-#endif
+ return !d->requests.empty();
}
void Client::receivedBytes(unsigned int count)
void Client::debugDumpRequests()
{
-#if defined(ENABLE_CURL)
-
-#else
- SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
- ConnectionDict::iterator it = d->connections.begin();
- for (; it != d->connections.end(); ++it) {
- it->second->debugDumpRequests();
+ SG_LOG(SG_IO, SG_INFO, "== HTTP request dump");
+ ClientPrivate::RequestCurlMap::iterator it = d->requests.begin();
+ for (; it != d->requests.end(); ++it) {
+ SG_LOG(SG_IO, SG_INFO, "\t" << it->first->url());
}
SG_LOG(SG_IO, SG_INFO, "==");
-#endif
}
void Client::clearAllConnections()
{
-#if defined(ENABLE_CURL)
curl_multi_cleanup(d->curlMulti);
d->createCurlMulti();
-#else
- ConnectionDict::iterator it = d->connections.begin();
- for (; it != d->connections.end(); ++it) {
- delete it->second;
- }
- d->connections.clear();
-#endif
}
} // of namespace HTTP