port(DEFAULT_HTTP_PORT)
{
}
-
+
virtual ~Connection()
{
}
state = STATE_GETTING_BODY;
responseComplete();
}
-
+
void setServer(const std::string& h, short p)
{
host = h;
port = p;
}
-
+
// socket-level errors
virtual void handleError(int error)
{
BOOST_FOREACH(Request_ptr req, sentRequests) {
req->setFailure(error, errStr);
}
-
+
BOOST_FOREACH(Request_ptr req, queuedRequests) {
req->setFailure(error, errStr);
}
-
+
sentRequests.clear();
queuedRequests.clear();
}
-
+
NetChat::handleError(error);
- if (activeRequest) {
+ if (activeRequest) {
activeRequest->setFailure(error, errStr);
activeRequest = NULL;
_contentDecoder.reset();
}
-
+
state = STATE_SOCKET_ERROR;
}
-
+
virtual void handleClose()
- {
+ {
NetChat::handleClose();
// closing of the connection from the server side when getting the body,
bool canCloseState = (state == STATE_GETTING_BODY);
if (canCloseState && activeRequest) {
- // force state here, so responseComplete can avoid closing the
+ // force state here, so responseComplete can avoid closing the
// socket again
state = STATE_CLOSED;
responseComplete();
} else {
if (activeRequest) {
activeRequest->setFailure(500, "server closed connection");
- // remove the failed request from sentRequests, so it does
+ // remove the failed request from sentRequests, so it does
// not get restored
- RequestList::iterator it = std::find(sentRequests.begin(),
+ RequestList::iterator it = std::find(sentRequests.begin(),
sentRequests.end(), activeRequest);
if (it != sentRequests.end()) {
sentRequests.erase(it);
activeRequest = NULL;
_contentDecoder.reset();
}
-
+
state = STATE_CLOSED;
}
-
+
if (sentRequests.empty()) {
return;
}
-
+
// restore sent requests to the queue, so they will be re-sent
// when the connection opens again
queuedRequests.insert(queuedRequests.begin(),
sentRequests.begin(), sentRequests.end());
sentRequests.clear();
}
-
+
void handleTimeout()
{
NetChat::handleError(ETIMEDOUT);
activeRequest = NULL;
_contentDecoder.reset();
}
-
+
state = STATE_SOCKET_ERROR;
}
-
+
void queueRequest(const Request_ptr& r)
{
queuedRequests.push_back(r);
tryStartNextRequest();
}
-
+
void beginResponse()
{
assert(!sentRequests.empty());
assert(state == STATE_WAITING_FOR_RESPONSE);
-
+
activeRequest = sentRequests.front();
try {
activeRequest->responseStart(buffer);
handleError(EIO);
return;
}
-
+
state = STATE_GETTING_HEADERS;
buffer.clear();
if (activeRequest->responseCode() == 204) {
chunkedTransfer = false;
_contentDecoder.reset();
}
-
+
void tryStartNextRequest()
{
while( !queuedRequests.empty()
idleTime.stamp();
return;
}
-
+
if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
return;
}
-
+
if (state == STATE_CLOSED) {
if (!connectToHost()) {
-
+
return;
}
-
+
setTerminator("\r\n");
state = STATE_IDLE;
}
-
+
Request_ptr r = queuedRequests.front();
r->requestStart();
assert(!path.empty());
std::string query = r->query();
std::string bodyData;
-
+
if (!client->proxyHost().empty()) {
path = r->scheme() + "://" + r->host() + r->path();
}
headerData << "Content-Type:" << r->bodyType() << "\r\n";
}
}
-
+
headerData << "Host: " << r->hostAndPort() << "\r\n";
headerData << "User-Agent:" << client->userAgent() << "\r\n";
headerData << "Accept-Encoding: deflate, gzip\r\n";
if (!bodyData.empty()) {
headerData << bodyData;
}
-
+
bool ok = push(headerData.str().c_str());
if (!ok) {
SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
break;
}
}
-
+
// SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
// "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
// "\n\t on connection " << this);
queuedRequests.pop_front();
sentRequests.push_back(r);
state = STATE_WAITING_FOR_RESPONSE;
-
+
// pipelining, let's maybe send the next request right away
tryStartNextRequest();
}
-
+
virtual void collectIncomingData(const char* s, int n)
{
idleTime.stamp();
case STATE_WAITING_FOR_RESPONSE:
beginResponse();
break;
-
+
case STATE_GETTING_HEADERS:
processHeader();
buffer.clear();
break;
-
+
case STATE_GETTING_BODY:
responseComplete();
break;
-
+
case STATE_GETTING_CHUNKED:
processChunkHeader();
break;
-
+
case STATE_GETTING_CHUNKED_BYTES:
setTerminator("\r\n");
state = STATE_GETTING_CHUNKED;
buffer.clear();
break;
-
+
case STATE_GETTING_TRAILER:
processTrailer();
buffer.clear();
break;
-
+
case STATE_IDLE:
SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
-
+
default:
break;
}
}
-
+
bool hasIdleTimeout() const
{
if (state != STATE_IDLE) {
return false;
}
-
+
assert(sentRequests.empty());
return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
}
-
+
bool hasErrorTimeout() const
{
if (state == STATE_IDLE) {
return false;
}
-
+
return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
}
-
+
bool hasError() const
{
return (state == STATE_SOCKET_ERROR);
}
-
+
bool shouldStartNext() const
{
return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
}
-
+
bool isActive() const
{
return !queuedRequests.empty() || !sentRequests.empty();
bool connectToHost()
{
SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
-
+
if (!open()) {
SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
return false;
}
-
+
if (connect(host.c_str(), port) != 0) {
return false;
}
-
+
return true;
}
-
-
+
+
void processHeader()
{
std::string h = strutils::simplify(buffer);
headersComplete();
return;
}
-
+
int colonPos = buffer.find(':');
if (colonPos < 0) {
SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
return;
}
-
+
std::string key = strutils::simplify(buffer.substr(0, colonPos));
std::string lkey = boost::to_lower_copy(key);
std::string value = strutils::strip(buffer.substr(colonPos + 1));
-
- // only consider these if getting headers (as opposed to trailers
+
+ // only consider these if getting headers (as opposed to trailers
// of a chunked transfer)
if (state == STATE_GETTING_HEADERS) {
if (lkey == "content-length") {
_contentDecoder.setEncoding(value);
}
}
-
+
activeRequest->responseHeader(lkey, value);
}
-
+
void processTransferEncoding(const std::string& te)
{
if (te == "chunked") {
// failure
}
}
-
+
void processChunkHeader()
{
if (buffer.empty()) {
// blank line after chunk data
return;
}
-
+
int chunkSize = 0;
int semiPos = buffer.find(';');
if (semiPos >= 0) {
} else {
chunkSize = strutils::to_int(buffer, 16);
}
-
+
buffer.clear();
if (chunkSize == 0) { // trailer start
state = STATE_GETTING_TRAILER;
return;
}
-
+
state = STATE_GETTING_CHUNKED_BYTES;
setByteCount(chunkSize);
}
-
+
void processTrailer()
- {
+ {
if (buffer.empty()) {
// end of trailers
responseComplete();
return;
}
-
+
// process as a normal header
processHeader();
}
-
+
void headersComplete()
{
activeRequest->responseHeadersComplete();
_contentDecoder.initWithRequest(activeRequest);
-
+
if (chunkedTransfer) {
state = STATE_GETTING_CHUNKED;
} else if (noMessageBody || (bodyTransferSize == 0)) {
state = STATE_GETTING_BODY;
}
}
-
+
void responseComplete()
{
Request_ptr completedRequest = activeRequest;
_contentDecoder.finish();
-
+
assert(sentRequests.front() == activeRequest);
sentRequests.pop_front();
bool doClose = activeRequest->closeAfterComplete();
activeRequest = NULL;
-
+
if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
if (doClose) {
// this will bring us into handleClose() above, which updates
// state to STATE_CLOSED
close();
-
+
// if we have additional requests waiting, try to start them now
tryStartNextRequest();
}
}
-
+
if (state != STATE_CLOSED) {
state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
}
-
+
// notify request after we change state, so this connection is idle
// if completion triggers other requests (which is likely)
// SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
completedRequest->responseComplete();
client->requestFinished(this);
-
+
setTerminator("\r\n");
}
-
+
enum ConnectionState {
STATE_IDLE = 0,
STATE_WAITING_FOR_RESPONSE,
STATE_SOCKET_ERROR,
STATE_CLOSED ///< connection should be closed now
};
-
+
Client* client;
Request_ptr activeRequest;
ConnectionState state;
SGTimeStamp idleTime;
bool chunkedTransfer;
bool noMessageBody;
-
+
RequestList queuedRequests;
RequestList sentRequests;
-
+
ContentDecoder _contentDecoder;
};
} else {
d->poller.poll(waitTimeout);
}
-
+
bool waitingRequests = !d->pendingRequests.empty();
ConnectionDict::iterator it = d->connections.begin();
for (; it != d->connections.end(); ) {
Connection* con = it->second;
- if (con->hasIdleTimeout() ||
+ if (con->hasIdleTimeout() ||
con->hasError() ||
con->hasErrorTimeout() ||
(!con->isActive() && waitingRequests))
// tell the connection we're timing it out
con->handleTimeout();
}
-
+
// connection has been idle for a while, clean it up
// (or if we have requests waiting for a different host,
// or an error condition
++it;
}
} // of connection iteration
-
+
if (waitingRequests && (d->connections.size() < d->maxConnections)) {
RequestList waiting(d->pendingRequests);
d->pendingRequests.clear();
-
+
// re-submit all waiting requests in order; this takes care of
// finding multiple pending items targetted to the same (new)
// connection
bool havePending = !d->pendingRequests.empty();
bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
ConnectionDict::iterator consEnd = d->connections.end();
-
+
// assign request to an existing Connection.
// various options exist here, examined in order
ConnectionDict::iterator it = d->connections.find(connectionId);
if (atConnectionsLimit && (it == consEnd)) {
// maximum number of connections active, queue this request
- // when a connection goes inactive, we'll start this one
+ // when a connection goes inactive, we'll start this one
d->pendingRequests.push_back(r);
return;
}
-
+
// scan for an idle Connection to the same host (likely if we're
// retrieving multiple resources from the same host in quick succession)
// if we have pending requests (waiting for a free Connection), then
break;
}
}
-
+
if (!con && atConnectionsLimit) {
// all current connections are busy (active), and we don't
// have free connections to allocate, so let's assign to
for (it = d->connections.find(connectionId); index > 0; --index) { ; }
con = it->second;
}
-
+
// allocate a new connection object
if (!con) {
con = new Connection(this);
con->setServer(host, port);
d->poller.addChannel(con);
- d->connections.insert(d->connections.end(),
+ d->connections.insert(d->connections.end(),
ConnectionDict::value_type(connectionId, con));
}
-
+
con->queueRequest(r);
}
void Client::requestFinished(Connection* con)
{
-
+
}
void Client::setUserAgent(const std::string& ua)
{
return d->userAgent;
}
-
+
const std::string& Client::proxyHost() const
{
return d->proxy;
}
-
+
const std::string& Client::proxyAuth() const
{
return d->proxyAuth;
for (; it != d->connections.end(); ++it) {
if (it->second->isActive()) return true;
}
-
+
return false;
}
d->bytesTransferred += count;
d->totalBytesDownloaded += count;
}
-
+
unsigned int Client::transferRateBytesPerSec() const
{
unsigned int e = d->timeTransferSample.elapsedMSec();
d->lastTransferRate = 0;
return 0;
}
-
+
if (e < 100) { // avoid really narrow windows
return d->lastTransferRate;
}
-
+
unsigned int ratio = (d->bytesTransferred * 1000) / e;
// run a low-pass filter
unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
smoothed /= 400;
-
+
d->timeTransferSample.stamp();
d->bytesTransferred = 0;
d->lastTransferRate = smoothed;