#include <simgear/simgear_config.h>
-#if defined(ENABLE_CURL)
- #include <curl/multi.h>
-#else
- #include <simgear/io/HTTPContentDecode.hxx>
-#endif
+#include <curl/multi.h>
#include <simgear/io/sg_netChat.hxx>
class Client::ClientPrivate
{
public:
-#if defined(ENABLE_CURL)
CURLM* curlMulti;
void createCurlMulti()
typedef std::map<Request_ptr, CURL*> RequestCurlMap;
RequestCurlMap requests;
-#else
- NetChannelPoller poller;
-// connections by host (potentially more than one)
- ConnectionDict connections;
-#endif
std::string userAgent;
std::string proxy;
uint64_t totalBytesDownloaded;
};
-#if !defined(ENABLE_CURL)
-class Connection : public NetChat
-{
-public:
- Connection(Client* pr, const std::string& conId) :
- client(pr),
- state(STATE_CLOSED),
- port(DEFAULT_HTTP_PORT),
- _connectionId(conId),
- _maxPipelineLength(255)
- {
- }
-
- virtual ~Connection()
- {
- }
-
- virtual void handleBufferRead (NetBuffer& buffer)
- {
- if( !activeRequest || !activeRequest->isComplete() )
- return NetChat::handleBufferRead(buffer);
-
- // Request should be aborted (signaled by setting its state to complete).
-
- // force the state to GETTING_BODY, to simplify logic in
- // responseComplete and handleClose
- setState(STATE_GETTING_BODY);
- responseComplete();
- }
-
- void setServer(const std::string& h, short p)
- {
- host = h;
- port = p;
- }
-
- void setMaxPipelineLength(unsigned int m)
- {
- _maxPipelineLength = m;
- }
-
- // socket-level errors
- virtual void handleError(int error)
- {
- const char* errStr = strerror(error);
- SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
- << errStr << ")");
-
- debugDumpRequests();
-
- if (!activeRequest)
- {
- // connection level failure, eg name lookup or routing
- // we won't have an active request yet, so let's fail all of the
- // requests since we presume it's a systematic failure for
- // the host in question
- BOOST_FOREACH(Request_ptr req, sentRequests) {
- req->setFailure(error, errStr);
- }
-
- BOOST_FOREACH(Request_ptr req, queuedRequests) {
- req->setFailure(error, errStr);
- }
-
- sentRequests.clear();
- queuedRequests.clear();
- }
-
- NetChat::handleError(error);
- if (activeRequest) {
- activeRequest->setFailure(error, errStr);
- activeRequest = NULL;
- _contentDecoder.reset();
- }
-
- setState(STATE_SOCKET_ERROR);
- }
-
- void handleTimeout()
- {
- handleError(ETIMEDOUT);
- }
-
- virtual void handleClose()
- {
- NetChat::handleClose();
-
- // closing of the connection from the server side when getting the body,
- bool canCloseState = (state == STATE_GETTING_BODY);
- bool isCancelling = (state == STATE_CANCELLING);
-
- if (canCloseState && activeRequest) {
- // check bodyTransferSize matches how much we actually transferred
- if (bodyTransferSize > 0) {
- if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
- SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
- << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
- }
- }
-
- // force state here, so responseComplete can avoid closing the
- // socket again
- SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
- setState(STATE_CLOSED);
- responseComplete();
- } else {
- if (state == STATE_WAITING_FOR_RESPONSE) {
- SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
- << sentRequests.front()->url());
- assert(!sentRequests.empty());
- sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
- // no active request, but don't restore the front sent one
- sentRequests.erase(sentRequests.begin());
- }
-
- if (activeRequest && !isCancelling) {
- activeRequest->setFailure(500, "server closed connection");
- // remove the failed request from sentRequests, so it does
- // not get restored
- RequestList::iterator it = std::find(sentRequests.begin(),
- sentRequests.end(), activeRequest);
- if (it != sentRequests.end()) {
- sentRequests.erase(it);
- }
- activeRequest = NULL;
- _contentDecoder.reset();
- }
-
- setState(STATE_CLOSED);
- }
-
- if (sentRequests.empty()) {
- return;
- }
-
- // restore sent requests to the queue, so they will be re-sent
- // when the connection opens again
- queuedRequests.insert(queuedRequests.begin(),
- sentRequests.begin(), sentRequests.end());
- sentRequests.clear();
- }
-
- void queueRequest(const Request_ptr& r)
- {
- queuedRequests.push_back(r);
- tryStartNextRequest();
- }
-
- void cancelRequest(const Request_ptr& r)
- {
- RequestList::iterator it = std::find(sentRequests.begin(),
- sentRequests.end(), r);
- if (it != sentRequests.end()) {
- sentRequests.erase(it);
-
- if ((r == activeRequest) || !activeRequest) {
- // either the cancelling request is active, or we're in waiting
- // for response state - close now
- setState(STATE_CANCELLING);
- close();
-
- setState(STATE_CLOSED);
- activeRequest = NULL;
- _contentDecoder.reset();
- } else if (activeRequest) {
- SG_LOG(SG_IO, SG_INFO, "con:" << _connectionId << " cancelling non-active: " << r->url());
-
- // has been sent but not active, let the active finish and
- // then close. Otherwise cancelling request #2 would mess up
- // active transfer #1
- activeRequest->setCloseAfterComplete();
- }
- } // of request has been sent
-
- // simpler case, not sent yet just remove from the queue
- it = std::find(queuedRequests.begin(), queuedRequests.end(), r);
- if (it != queuedRequests.end()) {
- queuedRequests.erase(it);
- }
- }
-
- void beginResponse()
- {
- assert(!sentRequests.empty());
- assert(state == STATE_WAITING_FOR_RESPONSE);
-
- activeRequest = sentRequests.front();
- try {
- SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
- activeRequest->responseStart(buffer);
- } catch (sg_exception& e) {
- handleError(EIO);
- return;
- }
-
- setState(STATE_GETTING_HEADERS);
- buffer.clear();
- if (activeRequest->responseCode() == 204) {
- noMessageBody = true;
- } else if (activeRequest->method() == "HEAD") {
- noMessageBody = true;
- } else {
- noMessageBody = false;
- }
-
- bodyTransferSize = -1;
- chunkedTransfer = false;
- _contentDecoder.reset();
- }
-
- void tryStartNextRequest()
- {
- while( !queuedRequests.empty()
- && queuedRequests.front()->isComplete() )
- queuedRequests.pop_front();
-
- if (queuedRequests.empty()) {
- idleTime.stamp();
- return;
- }
-
- if (sentRequests.size() >= _maxPipelineLength) {
- return;
- }
-
- if (state == STATE_CLOSED) {
- if (!connectToHost()) {
- setState(STATE_SOCKET_ERROR);
- return;
- }
-
- SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
- setTerminator("\r\n");
- setState(STATE_IDLE);
- }
-
- Request_ptr r = queuedRequests.front();
- r->requestStart();
-
- std::stringstream headerData;
- std::string path = r->path();
- assert(!path.empty());
- std::string query = r->query();
- std::string bodyData;
-
- if (!client->proxyHost().empty()) {
- path = r->scheme() + "://" + r->host() + r->path();
- }
-
- if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
- headerData << r->method() << " " << path << " HTTP/1.1\r\n";
- bodyData = query.substr(1); // URL-encode, drop the leading '?'
- headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
- headerData << "Content-Length:" << bodyData.size() << "\r\n";
- } else {
- headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
- if( r->hasBodyData() )
- {
- headerData << "Content-Length:" << r->bodyLength() << "\r\n";
- headerData << "Content-Type:" << r->bodyType() << "\r\n";
- }
- }
-
- headerData << "Host: " << r->hostAndPort() << "\r\n";
- headerData << "User-Agent:" << client->userAgent() << "\r\n";
- headerData << "Accept-Encoding: deflate, gzip\r\n";
- if (!client->proxyAuth().empty()) {
- headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
- }
-
- BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
- headerData << h.first << ": " << h.second << "\r\n";
- }
-
- headerData << "\r\n"; // final CRLF to terminate the headers
- if (!bodyData.empty()) {
- headerData << bodyData;
- }
-
- bool ok = push(headerData.str().c_str());
- if (!ok) {
- SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
- // we've over-stuffed the socket, give up for now, let things
- // drain down before trying to start any more requests.
- return;
- }
-
- if( r->hasBodyData() )
- for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
- {
- char buf[4096];
- size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
- if( len )
- {
- if( !bufferSend(buf, len) )
- {
- SG_LOG(SG_IO,
- SG_WARN,
- "overflow the HTTP::Connection output buffer");
- state = STATE_SOCKET_ERROR;
- return;
- }
- body_bytes_sent += len;
- }
- else
- {
- SG_LOG(SG_IO,
- SG_WARN,
- "HTTP asynchronous request body generation is unsupported");
- break;
- }
- }
-
- SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
- // successfully sent, remove from queue, and maybe send the next
- queuedRequests.pop_front();
- sentRequests.push_back(r);
- if (state == STATE_IDLE) {
- setState(STATE_WAITING_FOR_RESPONSE);
- }
-
- // pipelining, let's maybe send the next request right away
- tryStartNextRequest();
- }
-
- virtual void collectIncomingData(const char* s, int n)
- {
- idleTime.stamp();
- client->receivedBytes(static_cast<unsigned int>(n));
-
- if( (state == STATE_GETTING_BODY)
- || (state == STATE_GETTING_CHUNKED_BYTES) )
- _contentDecoder.receivedBytes(s, n);
- else
- buffer.append(s, n);
- }
-
- virtual void foundTerminator(void)
- {
- idleTime.stamp();
- switch (state) {
- case STATE_WAITING_FOR_RESPONSE:
- beginResponse();
- break;
-
- case STATE_GETTING_HEADERS:
- processHeader();
- buffer.clear();
- break;
-
- case STATE_GETTING_BODY:
- responseComplete();
- break;
-
- case STATE_GETTING_CHUNKED:
- processChunkHeader();
- break;
-
- case STATE_GETTING_CHUNKED_BYTES:
- setTerminator("\r\n");
- setState(STATE_GETTING_CHUNKED);
- buffer.clear();
- break;
-
-
- case STATE_GETTING_TRAILER:
- processTrailer();
- buffer.clear();
- break;
-
- case STATE_IDLE:
- SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
-
- default:
- break;
- }
- }
-
- bool hasIdleTimeout() const
- {
- if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
- return false;
- }
-
- assert(sentRequests.empty());
- bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
- return isTimedOut;
- }
-
- bool hasErrorTimeout() const
- {
- if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
- return false;
- }
-
- bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
- return isTimedOut;
- }
-
- bool hasError() const
- {
- return (state == STATE_SOCKET_ERROR);
- }
-
- bool shouldStartNext() const
- {
- return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
- }
-
- bool isActive() const
- {
- return !queuedRequests.empty() || !sentRequests.empty();
- }
-
- std::string connectionId() const
- {
- return _connectionId;
- }
-
- void debugDumpRequests() const
- {
- SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
- << "; state=" << state << ")");
- if (activeRequest) {
- SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
- } else {
- SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
- }
-
- BOOST_FOREACH(Request_ptr req, sentRequests) {
- SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
- }
-
- BOOST_FOREACH(Request_ptr req, queuedRequests) {
- SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
- }
- }
-private:
- enum ConnectionState {
- STATE_IDLE = 0,
- STATE_WAITING_FOR_RESPONSE,
- STATE_GETTING_HEADERS,
- STATE_GETTING_BODY,
- STATE_GETTING_CHUNKED,
- STATE_GETTING_CHUNKED_BYTES,
- STATE_GETTING_TRAILER,
- STATE_SOCKET_ERROR,
- STATE_CANCELLING, ///< cancelling an acitve request
- STATE_CLOSED ///< connection should be closed now
- };
-
- void setState(ConnectionState newState)
- {
- if (state == newState) {
- return;
- }
-
- state = newState;
- }
-
- bool connectToHost()
- {
- SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
-
- if (!open()) {
- SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
- return false;
- }
-
- if (connect(host.c_str(), port) != 0) {
- SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
- return false;
- }
-
- return true;
- }
-
-
- void processHeader()
- {
- std::string h = strutils::simplify(buffer);
- if (h.empty()) { // blank line terminates headers
- headersComplete();
- return;
- }
-
- int colonPos = buffer.find(':');
- if (colonPos < 0) {
- SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
- return;
- }
-
- std::string key = strutils::simplify(buffer.substr(0, colonPos));
- std::string lkey = boost::to_lower_copy(key);
- std::string value = strutils::strip(buffer.substr(colonPos + 1));
-
- // only consider these if getting headers (as opposed to trailers
- // of a chunked transfer)
- if (state == STATE_GETTING_HEADERS) {
- if (lkey == "content-length") {
-
- int sz = strutils::to_int(value);
- if (bodyTransferSize <= 0) {
- bodyTransferSize = sz;
- }
- activeRequest->setResponseLength(sz);
- } else if (lkey == "transfer-length") {
- bodyTransferSize = strutils::to_int(value);
- } else if (lkey == "transfer-encoding") {
- processTransferEncoding(value);
- } else if (lkey == "content-encoding") {
- _contentDecoder.setEncoding(value);
- }
- }
-
- activeRequest->responseHeader(lkey, value);
- }
-
- void processTransferEncoding(const std::string& te)
- {
- if (te == "chunked") {
- chunkedTransfer = true;
- } else {
- SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
- // failure
- }
- }
-
- void processChunkHeader()
- {
- if (buffer.empty()) {
- // blank line after chunk data
- return;
- }
-
- int chunkSize = 0;
- int semiPos = buffer.find(';');
- if (semiPos >= 0) {
- // extensions ignored for the moment
- chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
- } else {
- chunkSize = strutils::to_int(buffer, 16);
- }
-
- buffer.clear();
- if (chunkSize == 0) { // trailer start
- setState(STATE_GETTING_TRAILER);
- return;
- }
-
- setState(STATE_GETTING_CHUNKED_BYTES);
- setByteCount(chunkSize);
- }
-
- void processTrailer()
- {
- if (buffer.empty()) {
- // end of trailers
- responseComplete();
- return;
- }
-
- // process as a normal header
- processHeader();
- }
-
- void headersComplete()
- {
- activeRequest->responseHeadersComplete();
- _contentDecoder.initWithRequest(activeRequest);
-
- if (!activeRequest->serverSupportsPipelining()) {
- SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it");
- _maxPipelineLength = 1;
- }
-
- if (chunkedTransfer) {
- setState(STATE_GETTING_CHUNKED);
- } else if (noMessageBody || (bodyTransferSize == 0)) {
- // force the state to GETTING_BODY, to simplify logic in
- // responseComplete and handleClose
- setState(STATE_GETTING_BODY);
- responseComplete();
- } else {
- setByteCount(bodyTransferSize); // may be -1, that's fine
- setState(STATE_GETTING_BODY);
- }
- }
-
- void responseComplete()
- {
- Request_ptr completedRequest = activeRequest;
- _contentDecoder.finish();
-
- assert(sentRequests.front() == activeRequest);
- sentRequests.pop_front();
- bool doClose = activeRequest->closeAfterComplete();
- activeRequest = NULL;
-
- if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
- if (doClose) {
- SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
- // this will bring us into handleClose() above, which updates
- // state to STATE_CLOSED
- close();
-
- // if we have additional requests waiting, try to start them now
- tryStartNextRequest();
- }
- }
-
- if (state != STATE_CLOSED) {
- setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
- }
-
- // notify request after we change state, so this connection is idle
- // if completion triggers other requests (which is likely)
- completedRequest->responseComplete();
- client->requestFinished(this);
-
- setTerminator("\r\n");
- }
-
- Client* client;
- Request_ptr activeRequest;
- ConnectionState state;
- std::string host;
- short port;
- std::string buffer;
- int bodyTransferSize;
- SGTimeStamp idleTime;
- bool chunkedTransfer;
- bool noMessageBody;
-
- RequestList queuedRequests;
- RequestList sentRequests;
-
- ContentDecoder _contentDecoder;
- std::string _connectionId;
- unsigned int _maxPipelineLength;
-};
-#endif // of !ENABLE_CURL
-
Client::Client() :
d(new ClientPrivate)
{
d->totalBytesDownloaded = 0;
d->maxPipelineDepth = 5;
setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
-#if defined(ENABLE_CURL)
+
static bool didInitCurlGlobal = false;
if (!didInitCurlGlobal) {
curl_global_init(CURL_GLOBAL_ALL);
}
d->createCurlMulti();
-#endif
}
Client::~Client()
{
-#if defined(ENABLE_CURL)
curl_multi_cleanup(d->curlMulti);
-#endif
}
void Client::setMaxConnections(unsigned int maxCon)
{
d->maxConnections = maxCon;
-#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
-#endif
}
void Client::setMaxHostConnections(unsigned int maxHostCon)
{
d->maxHostConnections = maxHostCon;
-#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
-#endif
}
void Client::setMaxPipelineDepth(unsigned int depth)
{
d->maxPipelineDepth = depth;
-#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
-#else
- ConnectionDict::iterator it = d->connections.begin();
- for (; it != d->connections.end(); ) {
- it->second->setMaxPipelineLength(depth);
- }
-#endif
}
void Client::update(int waitTimeout)
{
-#if defined(ENABLE_CURL)
int remainingActive, messagesInQueue;
curl_multi_perform(d->curlMulti, &remainingActive);
}
} // of curl message processing loop
SGTimeStamp::sleepForMSec(waitTimeout);
-#else
- if (!d->poller.hasChannels() && (waitTimeout > 0)) {
- SGTimeStamp::sleepForMSec(waitTimeout);
- } else {
- d->poller.poll(waitTimeout);
- }
-
- bool waitingRequests = !d->pendingRequests.empty();
- ConnectionDict::iterator it = d->connections.begin();
- for (; it != d->connections.end(); ) {
- Connection* con = it->second;
- if (con->hasIdleTimeout() ||
- con->hasError() ||
- con->hasErrorTimeout() ||
- (!con->isActive() && waitingRequests))
- {
- if (con->hasErrorTimeout()) {
- // tell the connection we're timing it out
- con->handleTimeout();
- }
-
- // connection has been idle for a while, clean it up
- // (or if we have requests waiting for a different host,
- // or an error condition
- ConnectionDict::iterator del = it++;
- delete del->second;
- d->connections.erase(del);
- } else {
- if (it->second->shouldStartNext()) {
- it->second->tryStartNextRequest();
- }
- ++it;
- }
- } // of connection iteration
-
- if (waitingRequests && (d->connections.size() < d->maxConnections)) {
- RequestList waiting(d->pendingRequests);
- d->pendingRequests.clear();
-
- // re-submit all waiting requests in order; this takes care of
- // finding multiple pending items targetted to the same (new)
- // connection
- BOOST_FOREACH(Request_ptr req, waiting) {
- makeRequest(req);
- }
- }
-#endif
}
void Client::makeRequest(const Request_ptr& r)
r->_client = this;
-#if defined(ENABLE_CURL)
ClientPrivate::RequestCurlMap::iterator rit = d->requests.find(r);
assert(rit == d->requests.end());
// this seems premature, but we don't have a callback from Curl we could
// use to trigger when the requst is actually sent.
r->requestStart();
-
-#else
- if( r->url().find("http://") != 0 ) {
- r->setFailure(EINVAL, "only HTTP protocol is supported");
- return;
- }
-
- std::string host = r->host();
- int port = r->port();
- if (!d->proxy.empty()) {
- host = d->proxy;
- port = d->proxyPort;
- }
-
- Connection* con = NULL;
- std::stringstream ss;
- ss << host << "-" << port;
- std::string connectionId = ss.str();
- bool havePending = !d->pendingRequests.empty();
- bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
- ConnectionDict::iterator consEnd = d->connections.end();
-
- // assign request to an existing Connection.
- // various options exist here, examined in order
- ConnectionDict::iterator it = d->connections.find(connectionId);
- if (atConnectionsLimit && (it == consEnd)) {
- // maximum number of connections active, queue this request
- // when a connection goes inactive, we'll start this one
- d->pendingRequests.push_back(r);
- return;
- }
-
- // scan for an idle Connection to the same host (likely if we're
- // retrieving multiple resources from the same host in quick succession)
- // if we have pending requests (waiting for a free Connection), then
- // force new requests on this id to always use the first Connection
- // (instead of the random selection below). This ensures that when
- // there's pressure on the number of connections to keep alive, one
- // host can't DoS every other.
- int count = 0;
- for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
- if (havePending || !it->second->isActive()) {
- con = it->second;
- break;
- }
- }
-
- bool atHostConnectionsLimit = (count >= d->maxHostConnections);
-
- if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
- // all current connections are busy (active), and we don't
- // have free connections to allocate, so let's assign to
- // an existing one randomly. Ideally we'd used whichever one will
- // complete first but we don't have that info.
- int index = rand() % count;
- for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
- con = it->second;
- }
-
- // allocate a new connection object
- if (!con) {
- static int connectionSuffx = 0;
-
- std::stringstream ss;
- ss << connectionId << "-" << connectionSuffx++;
-
- SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
- con = new Connection(this, ss.str());
- con->setServer(host, port);
- con->setMaxPipelineLength(d->maxPipelineDepth);
- d->poller.addChannel(con);
- d->connections.insert(d->connections.end(),
- ConnectionDict::value_type(connectionId, con));
- }
-
- SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
- con->queueRequest(r);
-#endif
}
void Client::cancelRequest(const Request_ptr &r, std::string reason)
{
-#if defined(ENABLE_CURL)
ClientPrivate::RequestCurlMap::iterator it = d->requests.find(r);
if(it == d->requests.end()) {
// already being removed, presumably inside ::update()
curl_easy_cleanup(it->second);
d->requests.erase(it);
-#else
- ConnectionDict::iterator it = d->connections.begin();
- for (; it != d->connections.end(); ++it) {
- (it->second)->cancelRequest(r);
- }
-#endif
+
r->setFailure(-1, reason);
}
bool Client::hasActiveRequests() const
{
- #if defined(ENABLE_CURL)
return !d->requests.empty();
- #else
- ConnectionDict::const_iterator it = d->connections.begin();
- for (; it != d->connections.end(); ++it) {
- if (it->second->isActive()) return true;
- }
-
- return false;
-#endif
}
void Client::receivedBytes(unsigned int count)
void Client::debugDumpRequests()
{
-#if defined(ENABLE_CURL)
SG_LOG(SG_IO, SG_INFO, "== HTTP request dump");
ClientPrivate::RequestCurlMap::iterator it = d->requests.begin();
for (; it != d->requests.end(); ++it) {
SG_LOG(SG_IO, SG_INFO, "\t" << it->first->url());
}
SG_LOG(SG_IO, SG_INFO, "==");
-#else
- SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
- ConnectionDict::iterator it = d->connections.begin();
- for (; it != d->connections.end(); ++it) {
- it->second->debugDumpRequests();
- }
- SG_LOG(SG_IO, SG_INFO, "==");
-#endif
}
void Client::clearAllConnections()
{
-#if defined(ENABLE_CURL)
curl_multi_cleanup(d->curlMulti);
d->createCurlMulti();
-#else
- ConnectionDict::iterator it = d->connections.begin();
- for (; it != d->connections.end(); ++it) {
- delete it->second;
- }
- d->connections.clear();
-#endif
}
} // of namespace HTTP
+++ /dev/null
-// Written by James Turner
-//
-// Copyright (C) 2013 James Turner <zakalawe@mac.com>
-//
-// This library is free software; you can redistribute it and/or
-// modify it under the terms of the GNU Library General Public
-// License as published by the Free Software Foundation; either
-// version 2 of the License, or (at your option) any later version.
-//
-// This library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-// Library General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program; if not, write to the Free Software
-// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-//
-
-#include "HTTPContentDecode.hxx"
-
-#include <cassert>
-#include <cstdlib> // rand()
-#include <cstring> // for memset, memcpy
-
-#include <simgear/debug/logstream.hxx>
-#include <simgear/structure/exception.hxx>
-#include <simgear/io/lowlevel.hxx> // for sgEndian stuff
-
-namespace simgear
-{
-
-namespace HTTP
-{
-
- const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
- const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
-
- // see http://www.ietf.org/rfc/rfc1952.txt for these values and
- // detailed description of the logic
- const int GZIP_HEADER_ID1 = 31;
- const int GZIP_HEADER_ID2 = 139;
- const int GZIP_HEADER_METHOD_DEFLATE = 8;
- const unsigned int GZIP_HEADER_SIZE = 10;
- const int GZIP_HEADER_FEXTRA = 1 << 2;
- const int GZIP_HEADER_FNAME = 1 << 3;
- const int GZIP_HEADER_COMMENT = 1 << 4;
- const int GZIP_HEADER_CRC = 1 << 1;
-
-ContentDecoder::ContentDecoder() :
- _output(NULL),
- _zlib(NULL),
- _input(NULL),
- _inputAllocated(0)
-{
- reset();
-}
-
-ContentDecoder::~ContentDecoder()
-{
- free(_output);
- free(_input);
- free(_zlib);
-}
-
-void ContentDecoder::setEncoding(const std::string& encoding)
-{
- if (encoding == "gzip") {
- _contentDeflate = true;
- _needGZipHeader = true;
- } else if (encoding == "deflate") {
- _contentDeflate = true;
- _needGZipHeader = false;
- } else if (encoding != "identity") {
- SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << encoding);
- }
-}
-
-void ContentDecoder::reset()
-{
- _request = NULL;
- _contentDeflate = false;
- _needGZipHeader = false;
- _inputSize = 0;
- _totalReceivedBytes = 0;
-}
-
-void ContentDecoder::initWithRequest(Request_ptr req)
-{
- _request = req;
- if (!_contentDeflate) {
- return;
- }
-
- if (!_zlib) {
- _zlib = (z_stream*) malloc(sizeof(z_stream));
- }
-
- memset(_zlib, 0, sizeof(z_stream));
- if (!_output) {
- _output = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
- }
-
- _inputSize = 0;
- // NULLs means we'll get default alloc+free methods
- // which is absolutely fine
- _zlib->avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
- _zlib->next_out = _output;
- if (inflateInit2(_zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
- SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
- }
-}
-
-void ContentDecoder::finish()
-{
- if (_contentDeflate) {
- runDecoder();
- inflateEnd(_zlib);
- }
-}
-
-void ContentDecoder::receivedBytes(const char* n, size_t s)
-{
- _totalReceivedBytes += s;
- if (!_contentDeflate) {
- _request->processBodyBytes(n, s);
- return;
- }
-
-// allocate more space if needed (this will only happen rarely once the
-// buffer has hit something proportionate to the server's compression
-// window size)
- size_t requiredSize = _inputSize + s;
- if (requiredSize > _inputAllocated) {
- reallocateInputBuffer(requiredSize);
- }
-
-// copy newly recieved bytes into the buffer
- memcpy(_input + _inputSize, n, s);
- _inputSize += s;
-
- if (_needGZipHeader && !consumeGZipHeader()) {
- // still waiting on the full GZIP header, so done
- return;
- }
-
- runDecoder();
-}
-
-void ContentDecoder::consumeBytes(size_t consumed)
-{
- assert(_inputSize >= consumed);
-// move existing (consumed) bytes down
- if (consumed > 0) {
- size_t newSize = _inputSize - consumed;
- memmove(_input, _input + consumed, newSize);
- _inputSize = newSize;
- }
-}
-
-void ContentDecoder::reallocateInputBuffer(size_t newSize)
-{
- _input = (unsigned char*) realloc(_input, newSize);
- _inputAllocated = newSize;
-}
-
-void ContentDecoder::runDecoder()
-{
- _zlib->next_in = (unsigned char*) _input;
- _zlib->avail_in = _inputSize;
- int writtenSize;
-
- // loop, running zlib() inflate and sending output bytes to
- // our request body handler. Keep calling inflate until no bytes are
- // written, and ZLIB has consumed all available input
- do {
- _zlib->next_out = _output;
- _zlib->avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
- int result = inflate(_zlib, Z_NO_FLUSH);
- if (result == Z_OK || result == Z_STREAM_END) {
- // nothing to do
- } else if (result == Z_BUF_ERROR) {
- // transient error, fall through
- } else {
- // _error = result;
- return;
- }
-
- writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - _zlib->avail_out;
- if (writtenSize > 0) {
- _request->processBodyBytes((char*) _output, writtenSize);
- }
-
- if (result == Z_STREAM_END) {
- break;
- }
- } while ((_zlib->avail_in > 0) || (writtenSize > 0));
-
- // update input buffers based on what we consumed
- consumeBytes(_inputSize - _zlib->avail_in);
-}
-
-bool ContentDecoder::consumeGZipHeader()
-{
- size_t avail = _inputSize;
- if (avail < GZIP_HEADER_SIZE) {
- return false; // need more header bytes
- }
-
- if ((_input[0] != GZIP_HEADER_ID1) ||
- (_input[1] != GZIP_HEADER_ID2) ||
- (_input[2] != GZIP_HEADER_METHOD_DEFLATE))
- {
- return false; // invalid GZip header
- }
-
- char flags = _input[3];
- unsigned int gzipHeaderSize = GZIP_HEADER_SIZE;
- if (flags & GZIP_HEADER_FEXTRA) {
- gzipHeaderSize += 2;
- if (avail < gzipHeaderSize) {
- return false; // need more header bytes
- }
-
- unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(_input + GZIP_HEADER_FEXTRA));
- if ( sgIsBigEndian() ) {
- sgEndianSwap( &extraHeaderBytes );
- }
-
- gzipHeaderSize += extraHeaderBytes;
- if (avail < gzipHeaderSize) {
- return false; // need more header bytes
- }
- }
-
-#if 0
- if (flags & GZIP_HEADER_FNAME) {
- gzipHeaderSize++;
- while (gzipHeaderSize <= avail) {
- if (_input[gzipHeaderSize-1] == 0) {
- break; // found terminating NULL character
- }
- }
- }
-
- if (flags & GZIP_HEADER_COMMENT) {
- gzipHeaderSize++;
- while (gzipHeaderSize <= avail) {
- if (_input[gzipHeaderSize-1] == 0) {
- break; // found terminating NULL character
- }
- }
- }
-#endif
-
- if (flags & GZIP_HEADER_CRC) {
- gzipHeaderSize += 2;
- }
-
- if (avail < gzipHeaderSize) {
- return false; // need more header bytes
- }
-
- consumeBytes(gzipHeaderSize);
- _needGZipHeader = false;
- return true;
-}
-
-} // of namespace HTTP
-
-} // of namespace simgear