//
#include "HTTPClient.hxx"
+#include "HTTPFileRequest.hxx"
#include <sstream>
#include <cassert>
#include <cstdlib> // rand()
#include <list>
-#include <iostream>
#include <errno.h>
#include <map>
+#include <stdexcept>
#include <boost/foreach.hpp>
#include <boost/algorithm/string/case_conv.hpp>
-#include <zlib.h>
-
#include <simgear/io/sg_netChat.hxx>
-#include <simgear/io/lowlevel.hxx>
+#include <simgear/io/HTTPContentDecode.hxx>
#include <simgear/misc/strutils.hxx>
#include <simgear/compiler.h>
#include <simgear/debug/logstream.hxx>
# endif
#endif
-using std::string;
-using std::stringstream;
-using std::vector;
-
namespace simgear
{
extern const int DEFAULT_HTTP_PORT = 80;
const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
const unsigned int MAX_INFLIGHT_REQUESTS = 32;
-const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
-const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
-
-// see http://www.ietf.org/rfc/rfc1952.txt for these values and
-// detailed description of the logic
-const int GZIP_HEADER_ID1 = 31;
-const int GZIP_HEADER_ID2 = 139;
-const int GZIP_HEADER_METHOD_DEFLATE = 8;
-const unsigned int GZIP_HEADER_SIZE = 10;
-const int GZIP_HEADER_FEXTRA = 1 << 2;
-const int GZIP_HEADER_FNAME = 1 << 3;
-const int GZIP_HEADER_COMMENT = 1 << 4;
-const int GZIP_HEADER_CRC = 1 << 1;
class Connection;
typedef std::multimap<std::string, Connection*> ConnectionDict;
SGTimeStamp timeTransferSample;
unsigned int bytesTransferred;
unsigned int lastTransferRate;
+ uint64_t totalBytesDownloaded;
};
class Connection : public NetChat
Connection(Client* pr) :
client(pr),
state(STATE_CLOSED),
- port(DEFAULT_HTTP_PORT),
- zlibInflateBuffer(NULL),
- zlibInflateBufferSize(0),
- zlibOutputBuffer(NULL)
+ port(DEFAULT_HTTP_PORT)
{
-
}
virtual ~Connection()
{
- if (zlibInflateBuffer) {
- free(zlibInflateBuffer);
- }
-
- if (zlibOutputBuffer) {
- free(zlibOutputBuffer);
- }
+ }
+
+ virtual void handleBufferRead (NetBuffer& buffer)
+ {
+ if( !activeRequest || !activeRequest->isComplete() )
+ return NetChat::handleBufferRead(buffer);
+
+ // Request should be aborted (signaled by setting its state to complete).
+
+ // force the state to GETTING_BODY, to simplify logic in
+ // responseComplete and handleClose
+ state = STATE_GETTING_BODY;
+ responseComplete();
}
- void setServer(const string& h, short p)
+ void setServer(const std::string& h, short p)
{
host = h;
port = p;
SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
activeRequest->setFailure(error, "socket error");
activeRequest = NULL;
+ _contentDecoder.reset();
}
state = STATE_SOCKET_ERROR;
sentRequests.erase(it);
}
activeRequest = NULL;
+ _contentDecoder.reset();
}
state = STATE_CLOSED;
SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
activeRequest->setFailure(ETIMEDOUT, "socket timeout");
activeRequest = NULL;
+ _contentDecoder.reset();
}
state = STATE_SOCKET_ERROR;
bodyTransferSize = -1;
chunkedTransfer = false;
- contentGZip = contentDeflate = false;
+ _contentDecoder.reset();
}
void tryStartNextRequest()
{
+ while( !queuedRequests.empty()
+ && queuedRequests.front()->isComplete() )
+ queuedRequests.pop_front();
+
if (queuedRequests.empty()) {
idleTime.stamp();
return;
Request_ptr r = queuedRequests.front();
r->requestStart();
- requestBodyBytesToSend = r->requestBodyLength();
-
- stringstream headerData;
- string path = r->path();
+
+ std::stringstream headerData;
+ std::string path = r->path();
assert(!path.empty());
- string query = r->query();
- string bodyData;
+ std::string query = r->query();
+ std::string bodyData;
if (!client->proxyHost().empty()) {
path = r->scheme() + "://" + r->host() + r->path();
}
- if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
+ if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
headerData << r->method() << " " << path << " HTTP/1.1\r\n";
bodyData = query.substr(1); // URL-encode, drop the leading '?'
headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
headerData << "Content-Length:" << bodyData.size() << "\r\n";
} else {
headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
- if (requestBodyBytesToSend >= 0) {
- headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
- headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
+ if( r->hasBodyData() )
+ {
+ headerData << "Content-Length:" << r->bodyLength() << "\r\n";
+ headerData << "Content-Type:" << r->bodyType() << "\r\n";
}
}
headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
}
- BOOST_FOREACH(string h, r->requestHeaders()) {
- headerData << h << ": " << r->header(h) << "\r\n";
+ BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
+ headerData << h.first << ": " << h.second << "\r\n";
}
headerData << "\r\n"; // final CRLF to terminate the headers
// drain down before trying to start any more requests.
return;
}
-
- while (requestBodyBytesToSend > 0) {
- char buf[4096];
- int len = r->getBodyData(buf, 4096);
- if (len > 0) {
- requestBodyBytesToSend -= len;
- if (!bufferSend(buf, len)) {
- SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
- state = STATE_SOCKET_ERROR;
- return;
+
+ if( r->hasBodyData() )
+ for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
+ {
+ char buf[4096];
+ size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
+ if( len )
+ {
+ if( !bufferSend(buf, len) )
+ {
+ SG_LOG(SG_IO,
+ SG_WARN,
+ "overflow the HTTP::Connection output buffer");
+ state = STATE_SOCKET_ERROR;
+ return;
+ }
+ body_bytes_sent += len;
+ }
+ else
+ {
+ SG_LOG(SG_IO,
+ SG_WARN,
+ "HTTP asynchronous request body generation is unsupported");
+ break;
}
- // SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
- } else {
- SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
- break;
}
- }
- // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
- // "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
- // "\n\t on connection " << this);
- // successfully sent, remove from queue, and maybe send the next
+ // SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
+ // "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
+ // "\n\t on connection " << this);
+ // successfully sent, remove from queue, and maybe send the next
queuedRequests.pop_front();
sentRequests.push_back(r);
- state = STATE_WAITING_FOR_RESPONSE;
+ state = STATE_WAITING_FOR_RESPONSE;
- // pipelining, let's maybe send the next request right away
+ // pipelining, let's maybe send the next request right away
tryStartNextRequest();
}
{
idleTime.stamp();
client->receivedBytes(static_cast<unsigned int>(n));
-
- if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
- if (contentGZip || contentDeflate) {
- expandCompressedData(s, n);
- } else {
- activeRequest->processBodyBytes(s, n);
- }
- } else {
- buffer += string(s, n);
- }
- }
-
- void expandCompressedData(const char* s, int n)
- {
- int reqSize = n + zlib.avail_in;
- if (reqSize > zlibInflateBufferSize) {
- // reallocate
- unsigned char* newBuf = (unsigned char*) malloc(reqSize);
- memcpy(newBuf, zlib.next_in, zlib.avail_in);
- memcpy(newBuf + zlib.avail_in, s, n);
- free(zlibInflateBuffer);
- zlibInflateBuffer = newBuf;
- zlibInflateBufferSize = reqSize;
- } else {
- // important to use memmove here, since it's very likely
- // the source and destination ranges overlap
- memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
- memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
- }
-
- zlib.next_in = (unsigned char*) zlibInflateBuffer;
- zlib.avail_in = reqSize;
- zlib.next_out = zlibOutputBuffer;
- zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
-
- if (contentGZip && !handleGZipHeader()) {
- return;
- }
-
- int writtenSize = 0;
- do {
- int result = inflate(&zlib, Z_NO_FLUSH);
- if (result == Z_OK || result == Z_STREAM_END) {
- // nothing to do
- } else {
- SG_LOG(SG_IO, SG_WARN, "HTTP: got Zlib error:" << result);
- return;
- }
-
- writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
- if (result == Z_STREAM_END) {
- break;
- }
- } while ((writtenSize == 0) && (zlib.avail_in > 0));
-
- if (writtenSize > 0) {
- activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
- }
+ if( (state == STATE_GETTING_BODY)
+ || (state == STATE_GETTING_CHUNKED_BYTES) )
+ _contentDecoder.receivedBytes(s, n);
+ else
+ buffer.append(s, n);
}
-
- bool handleGZipHeader()
- {
- // we clear this down to contentDeflate once the GZip header has been seen
- if (zlib.avail_in < GZIP_HEADER_SIZE) {
- return false; // need more header bytes
- }
-
- if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
- (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
- (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
- {
- return false; // invalid GZip header
- }
-
- char flags = zlibInflateBuffer[3];
- unsigned int gzipHeaderSize = GZIP_HEADER_SIZE;
- if (flags & GZIP_HEADER_FEXTRA) {
- gzipHeaderSize += 2;
- if (zlib.avail_in < gzipHeaderSize) {
- return false; // need more header bytes
- }
-
- unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
- if ( sgIsBigEndian() ) {
- sgEndianSwap( &extraHeaderBytes );
- }
-
- gzipHeaderSize += extraHeaderBytes;
- if (zlib.avail_in < gzipHeaderSize) {
- return false; // need more header bytes
- }
- }
-
- if (flags & GZIP_HEADER_FNAME) {
- gzipHeaderSize++;
- while (gzipHeaderSize <= zlib.avail_in) {
- if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
- break; // found terminating NULL character
- }
- }
- }
-
- if (flags & GZIP_HEADER_COMMENT) {
- gzipHeaderSize++;
- while (gzipHeaderSize <= zlib.avail_in) {
- if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
- break; // found terminating NULL character
- }
- }
- }
-
- if (flags & GZIP_HEADER_CRC) {
- gzipHeaderSize += 2;
- }
-
- if (zlib.avail_in < gzipHeaderSize) {
- return false; // need more header bytes
- }
-
- zlib.next_in += gzipHeaderSize;
- zlib.avail_in -= gzipHeaderSize;
- // now we've processed the GZip header, can decode as deflate
- contentGZip = false;
- contentDeflate = true;
- return true;
- }
-
+
virtual void foundTerminator(void)
{
idleTime.stamp();
void processHeader()
{
- string h = strutils::simplify(buffer);
+ std::string h = strutils::simplify(buffer);
if (h.empty()) { // blank line terminates headers
headersComplete();
-
- if (contentGZip || contentDeflate) {
- memset(&zlib, 0, sizeof(z_stream));
- if (!zlibOutputBuffer) {
- zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
- }
-
- // NULLs means we'll get default alloc+free methods
- // which is absolutely fine
- zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
- zlib.next_out = zlibOutputBuffer;
- if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
- SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
- }
- }
-
- if (chunkedTransfer) {
- state = STATE_GETTING_CHUNKED;
- } else if (noMessageBody || (bodyTransferSize == 0)) {
- // force the state to GETTING_BODY, to simplify logic in
- // responseComplete and handleClose
- state = STATE_GETTING_BODY;
- responseComplete();
- } else {
- setByteCount(bodyTransferSize); // may be -1, that's fine
- state = STATE_GETTING_BODY;
- }
-
return;
}
return;
}
- string key = strutils::simplify(buffer.substr(0, colonPos));
- string lkey = boost::to_lower_copy(key);
- string value = strutils::strip(buffer.substr(colonPos + 1));
+ std::string key = strutils::simplify(buffer.substr(0, colonPos));
+ std::string lkey = boost::to_lower_copy(key);
+ std::string value = strutils::strip(buffer.substr(colonPos + 1));
// only consider these if getting headers (as opposed to trailers
// of a chunked transfer)
} else if (lkey == "transfer-encoding") {
processTransferEncoding(value);
} else if (lkey == "content-encoding") {
- if (value == "gzip") {
- contentGZip = true;
- } else if (value == "deflate") {
- contentDeflate = true;
- } else if (value != "identity") {
- SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
- }
+ _contentDecoder.setEncoding(value);
}
}
activeRequest->responseHeader(lkey, value);
}
- void processTransferEncoding(const string& te)
+ void processTransferEncoding(const std::string& te)
{
if (te == "chunked") {
chunkedTransfer = true;
void headersComplete()
{
activeRequest->responseHeadersComplete();
+ _contentDecoder.initWithRequest(activeRequest);
+
+ if (chunkedTransfer) {
+ state = STATE_GETTING_CHUNKED;
+ } else if (noMessageBody || (bodyTransferSize == 0)) {
+ // force the state to GETTING_BODY, to simplify logic in
+ // responseComplete and handleClose
+ state = STATE_GETTING_BODY;
+ responseComplete();
+ } else {
+ setByteCount(bodyTransferSize); // may be -1, that's fine
+ state = STATE_GETTING_BODY;
+ }
}
void responseComplete()
{
- Request_ptr completedRequest = activeRequest;
- if (contentDeflate) {
- inflateEnd(&zlib);
- }
+ Request_ptr completedRequest = activeRequest;
+ _contentDecoder.finish();
assert(sentRequests.front() == activeRequest);
sentRequests.pop_front();
Client* client;
Request_ptr activeRequest;
ConnectionState state;
- string host;
+ std::string host;
short port;
std::string buffer;
int bodyTransferSize;
SGTimeStamp idleTime;
bool chunkedTransfer;
bool noMessageBody;
- int requestBodyBytesToSend;
-
- z_stream zlib;
- unsigned char* zlibInflateBuffer;
- int zlibInflateBufferSize;
- unsigned char* zlibOutputBuffer;
- bool contentGZip, contentDeflate;
-
+
RequestList queuedRequests;
RequestList sentRequests;
+
+ ContentDecoder _contentDecoder;
};
Client::Client() :
d->bytesTransferred = 0;
d->lastTransferRate = 0;
d->timeTransferSample.stamp();
+ d->totalBytesDownloaded = 0;
setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
}
void Client::update(int waitTimeout)
{
- d->poller.poll(waitTimeout);
- bool waitingRequests = !d->pendingRequests.empty();
+ if (!d->poller.hasChannels() && (waitTimeout > 0)) {
+ SGTimeStamp::sleepForMSec(waitTimeout);
+ } else {
+ d->poller.poll(waitTimeout);
+ }
+ bool waitingRequests = !d->pendingRequests.empty();
ConnectionDict::iterator it = d->connections.begin();
for (; it != d->connections.end(); ) {
Connection* con = it->second;
void Client::makeRequest(const Request_ptr& r)
{
- string host = r->host();
+ if( r->isComplete() )
+ return;
+
+ if( r->url().find("://") == std::string::npos ) {
+ r->setFailure(EINVAL, "malformed URL");
+ return;
+ }
+
+ if( r->url().find("http://") != 0 ) {
+ r->setFailure(EINVAL, "only HTTP protocol is supported");
+ return;
+ }
+
+ std::string host = r->host();
int port = r->port();
if (!d->proxy.empty()) {
host = d->proxy;
}
Connection* con = NULL;
- stringstream ss;
+ std::stringstream ss;
ss << host << "-" << port;
- string connectionId = ss.str();
+ std::string connectionId = ss.str();
bool havePending = !d->pendingRequests.empty();
bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
ConnectionDict::iterator consEnd = d->connections.end();
con->queueRequest(r);
}
+//------------------------------------------------------------------------------
+FileRequestRef Client::save( const std::string& url,
+ const std::string& filename )
+{
+ FileRequestRef req = new FileRequest(url, filename);
+ makeRequest(req);
+ return req;
+}
+
+//------------------------------------------------------------------------------
+MemoryRequestRef Client::load(const std::string& url)
+{
+ MemoryRequestRef req = new MemoryRequest(url);
+ makeRequest(req);
+ return req;
+}
+
void Client::requestFinished(Connection* con)
{
}
-void Client::setUserAgent(const string& ua)
+void Client::setUserAgent(const std::string& ua)
{
d->userAgent = ua;
}
return d->proxyAuth;
}
-void Client::setProxy(const string& proxy, int port, const string& auth)
+void Client::setProxy( const std::string& proxy,
+ int port,
+ const std::string& auth )
{
d->proxy = proxy;
d->proxyPort = port;
void Client::receivedBytes(unsigned int count)
{
d->bytesTransferred += count;
+ d->totalBytesDownloaded += count;
}
unsigned int Client::transferRateBytesPerSec() const
{
unsigned int e = d->timeTransferSample.elapsedMSec();
- if (e < 400) {
- // if called too frequently, return cahced value, to smooth out
- // < 1 sec changes in flow
+ if (e > 400) {
+ // too long a window, ignore
+ d->timeTransferSample.stamp();
+ d->bytesTransferred = 0;
+ d->lastTransferRate = 0;
+ return 0;
+ }
+
+ if (e < 100) { // avoid really narrow windows
return d->lastTransferRate;
}
unsigned int ratio = (d->bytesTransferred * 1000) / e;
+ // run a low-pass filter
+ unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
+ smoothed /= 400;
+
d->timeTransferSample.stamp();
d->bytesTransferred = 0;
- d->lastTransferRate = ratio;
- return ratio;
+ d->lastTransferRate = smoothed;
+ return smoothed;
+}
+
+uint64_t Client::totalBytesDownloaded() const
+{
+ return d->totalBytesDownloaded;
}
} // of namespace HTTP