extern const int DEFAULT_HTTP_PORT = 80;
const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
-const unsigned int MAX_INFLIGHT_REQUESTS = 4;
class Connection;
typedef std::multimap<std::string, Connection*> ConnectionDict;
curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
- (long) MAX_INFLIGHT_REQUESTS);
+ (long) maxPipelineDepth);
+ curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
+ (long) maxHostConnections);
+
}
#else
int proxyPort;
std::string proxyAuth;
unsigned int maxConnections;
+ unsigned int maxHostConnections;
+ unsigned int maxPipelineDepth;
RequestList pendingRequests;
-
-
SGTimeStamp timeTransferSample;
unsigned int bytesTransferred;
unsigned int lastTransferRate;
client(pr),
state(STATE_CLOSED),
port(DEFAULT_HTTP_PORT),
- connectionId(conId)
+ _connectionId(conId),
+ _maxPipelineLength(255)
{
}
// force the state to GETTING_BODY, to simplify logic in
// responseComplete and handleClose
- state = STATE_GETTING_BODY;
+ setState(STATE_GETTING_BODY);
responseComplete();
}
port = p;
}
+ void setMaxPipelineLength(unsigned int m)
+ {
+ _maxPipelineLength = m;
+ }
+
// socket-level errors
virtual void handleError(int error)
{
const char* errStr = strerror(error);
- SG_LOG(SG_IO, SG_WARN, "HTTP Connection handleError:" << error << " ("
+ SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
<< errStr << ")");
debugDumpRequests();
_contentDecoder.reset();
}
- state = STATE_SOCKET_ERROR;
+ setState(STATE_SOCKET_ERROR);
}
void handleTimeout()
// closing of the connection from the server side when getting the body,
bool canCloseState = (state == STATE_GETTING_BODY);
if (canCloseState && activeRequest) {
+ // check bodyTransferSize matches how much we actually transferred
+ if (bodyTransferSize > 0) {
+ if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
+ SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
+ << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
+ }
+ }
+
// force state here, so responseComplete can avoid closing the
// socket again
- state = STATE_CLOSED;
+ SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
+ setState(STATE_CLOSED);
responseComplete();
} else {
if (state == STATE_WAITING_FOR_RESPONSE) {
+ SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
+ << sentRequests.front()->url());
assert(!sentRequests.empty());
sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
// no active request, but don't restore the front sent one
_contentDecoder.reset();
}
- state = STATE_CLOSED;
+ setState(STATE_CLOSED);
}
if (sentRequests.empty()) {
activeRequest = sentRequests.front();
try {
+ SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
activeRequest->responseStart(buffer);
} catch (sg_exception& e) {
handleError(EIO);
return;
}
- state = STATE_GETTING_HEADERS;
+ setState(STATE_GETTING_HEADERS);
buffer.clear();
if (activeRequest->responseCode() == 204) {
noMessageBody = true;
return;
}
- if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
+ if (sentRequests.size() >= _maxPipelineLength) {
return;
}
if (state == STATE_CLOSED) {
if (!connectToHost()) {
-
+ setState(STATE_SOCKET_ERROR);
return;
}
+ SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
setTerminator("\r\n");
- state = STATE_IDLE;
+ setState(STATE_IDLE);
}
Request_ptr r = queuedRequests.front();
}
}
- SG_LOG(SG_IO, SG_DEBUG, "con:" << connectionId << " did start request:" << r->url());
+ SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
// successfully sent, remove from queue, and maybe send the next
queuedRequests.pop_front();
sentRequests.push_back(r);
if (state == STATE_IDLE) {
- state = STATE_WAITING_FOR_RESPONSE;
+ setState(STATE_WAITING_FOR_RESPONSE);
}
// pipelining, let's maybe send the next request right away
case STATE_GETTING_CHUNKED_BYTES:
setTerminator("\r\n");
- state = STATE_GETTING_CHUNKED;
+ setState(STATE_GETTING_CHUNKED);
buffer.clear();
break;
bool shouldStartNext() const
{
- return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
+ return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
}
bool isActive() const
return !queuedRequests.empty() || !sentRequests.empty();
}
+ std::string connectionId() const
+ {
+ return _connectionId;
+ }
+
void debugDumpRequests() const
{
- SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << connectionId
+ SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
<< "; state=" << state << ")");
if (activeRequest) {
SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
}
}
private:
+ enum ConnectionState {
+ STATE_IDLE = 0,
+ STATE_WAITING_FOR_RESPONSE,
+ STATE_GETTING_HEADERS,
+ STATE_GETTING_BODY,
+ STATE_GETTING_CHUNKED,
+ STATE_GETTING_CHUNKED_BYTES,
+ STATE_GETTING_TRAILER,
+ STATE_SOCKET_ERROR,
+ STATE_CLOSED ///< connection should be closed now
+ };
+
+ void setState(ConnectionState newState)
+ {
+ if (state == newState) {
+ return;
+ }
+
+ state = newState;
+ }
+
bool connectToHost()
{
SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
buffer.clear();
if (chunkSize == 0) { // trailer start
- state = STATE_GETTING_TRAILER;
+ setState(STATE_GETTING_TRAILER);
return;
}
- state = STATE_GETTING_CHUNKED_BYTES;
+ setState(STATE_GETTING_CHUNKED_BYTES);
setByteCount(chunkSize);
}
_contentDecoder.initWithRequest(activeRequest);
if (chunkedTransfer) {
- state = STATE_GETTING_CHUNKED;
+ setState(STATE_GETTING_CHUNKED);
} else if (noMessageBody || (bodyTransferSize == 0)) {
// force the state to GETTING_BODY, to simplify logic in
// responseComplete and handleClose
- state = STATE_GETTING_BODY;
+ setState(STATE_GETTING_BODY);
responseComplete();
} else {
setByteCount(bodyTransferSize); // may be -1, that's fine
- state = STATE_GETTING_BODY;
+ setState(STATE_GETTING_BODY);
}
}
if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
if (doClose) {
+ SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
// this will bring us into handleClose() above, which updates
// state to STATE_CLOSED
close();
}
if (state != STATE_CLOSED) {
- state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
+ setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
}
// notify request after we change state, so this connection is idle
setTerminator("\r\n");
}
- enum ConnectionState {
- STATE_IDLE = 0,
- STATE_WAITING_FOR_RESPONSE,
- STATE_GETTING_HEADERS,
- STATE_GETTING_BODY,
- STATE_GETTING_CHUNKED,
- STATE_GETTING_CHUNKED_BYTES,
- STATE_GETTING_TRAILER,
- STATE_SOCKET_ERROR,
- STATE_CLOSED ///< connection should be closed now
- };
-
Client* client;
Request_ptr activeRequest;
ConnectionState state;
RequestList sentRequests;
ContentDecoder _contentDecoder;
- std::string connectionId;
+ std::string _connectionId;
+ unsigned int _maxPipelineLength;
};
#endif // of !ENABLE_CURL
{
d->proxyPort = 0;
d->maxConnections = 4;
+ d->maxHostConnections = 4;
d->bytesTransferred = 0;
d->lastTransferRate = 0;
d->timeTransferSample.stamp();
d->totalBytesDownloaded = 0;
-
+ d->maxPipelineDepth = 5;
setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
#if defined(ENABLE_CURL)
static bool didInitCurlGlobal = false;
void Client::setMaxConnections(unsigned int maxCon)
{
- if (maxCon < 1) {
- throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
- }
-
d->maxConnections = maxCon;
#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
#endif
}
+void Client::setMaxHostConnections(unsigned int maxHostCon)
+{
+ d->maxHostConnections = maxHostCon;
+#if defined(ENABLE_CURL)
+ curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
+#endif
+}
+
+void Client::setMaxPipelineDepth(unsigned int depth)
+{
+ d->maxPipelineDepth = depth;
+#if defined(ENABLE_CURL)
+ curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
+#else
+ ConnectionDict::iterator it = d->connections.begin();
+ for (; it != d->connections.end(); ) {
+ it->second->setMaxPipelineLength(depth);
+ }
+#endif
+}
+
void Client::update(int waitTimeout)
{
#if defined(ENABLE_CURL)
}
}
- if (!con && atConnectionsLimit) {
+ bool atHostConnectionsLimit = (count >= d->maxHostConnections);
+
+ if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
// all current connections are busy (active), and we don't
// have free connections to allocate, so let's assign to
// an existing one randomly. Ideally we'd used whichever one will
// complete first but we don't have that info.
int index = rand() % count;
- for (it = d->connections.find(connectionId); index > 0; --index) { ; }
+ for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
con = it->second;
}
// allocate a new connection object
if (!con) {
- con = new Connection(this, connectionId);
+ static int connectionSuffx = 0;
+
+ std::stringstream ss;
+ ss << connectionId << "-" << connectionSuffx++;
+
+ SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
+ con = new Connection(this, ss.str());
con->setServer(host, port);
+ con->setMaxPipelineLength(d->maxPipelineDepth);
d->poller.addChannel(con);
d->connections.insert(d->connections.end(),
ConnectionDict::value_type(connectionId, con));
}
+ SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
con->queueRequest(r);
#endif
}