1 #include "HTTPClient.hxx"
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string/case_conv.hpp>
14 #include <simgear/io/sg_netChat.hxx>
15 #include <simgear/io/lowlevel.hxx>
16 #include <simgear/misc/strutils.hxx>
17 #include <simgear/compiler.h>
18 #include <simgear/debug/logstream.hxx>
19 #include <simgear/timing/timestamp.hxx>
21 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
24 # if !defined(SIMGEAR_VERSION)
25 # define SIMGEAR_VERSION "simgear-development"
30 using std::stringstream;
39 extern const int DEFAULT_HTTP_PORT = 80;
40 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
41 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
42 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
43 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
45 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
46 // detailed description of the logic
47 const int GZIP_HEADER_ID1 = 31;
48 const int GZIP_HEADER_ID2 = 139;
49 const int GZIP_HEADER_METHOD_DEFLATE = 8;
50 const int GZIP_HEADER_SIZE = 10;
51 const int GZIP_HEADER_FEXTRA = 1 << 2;
52 const int GZIP_HEADER_FNAME = 1 << 3;
53 const int GZIP_HEADER_COMMENT = 1 << 4;
54 const int GZIP_HEADER_CRC = 1 << 1;
56 class Connection : public NetChat
59 Connection(Client* pr) :
62 port(DEFAULT_HTTP_PORT),
63 zlibInflateBuffer(NULL),
64 zlibInflateBufferSize(0),
65 zlibOutputBuffer(NULL)
72 if (zlibInflateBuffer) {
73 free(zlibInflateBuffer);
76 if (zlibOutputBuffer) {
77 free(zlibOutputBuffer);
81 void setServer(const string& h, short p)
87 // socket-level errors
88 virtual void handleError(int error)
90 if (error == ENOENT) {
91 // name lookup failure
92 // we won't have an active request yet, so the logic below won't
93 // fire to actually call setFailure. Let's fail all of the requests
94 BOOST_FOREACH(Request_ptr req, sentRequests) {
95 req->setFailure(error, "hostname lookup failure");
98 BOOST_FOREACH(Request_ptr req, queuedRequests) {
99 req->setFailure(error, "hostname lookup failure");
102 // name lookup failure, abandon all requests on this connection
103 sentRequests.clear();
104 queuedRequests.clear();
107 NetChat::handleError(error);
109 SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
110 activeRequest->setFailure(error, "socket error");
111 activeRequest = NULL;
114 state = STATE_SOCKET_ERROR;
117 virtual void handleClose()
119 NetChat::handleClose();
121 if ((state == STATE_GETTING_BODY) && activeRequest) {
122 // force state here, so responseComplete can avoid closing the
124 state = STATE_CLOSED;
127 state = STATE_CLOSED;
130 if (sentRequests.empty()) {
134 // restore sent requests to the queue, so they will be re-sent
135 // when the connection opens again
136 queuedRequests.insert(queuedRequests.begin(),
137 sentRequests.begin(), sentRequests.end());
138 sentRequests.clear();
141 void queueRequest(const Request_ptr& r)
143 queuedRequests.push_back(r);
144 tryStartNextRequest();
149 assert(!sentRequests.empty());
151 activeRequest = sentRequests.front();
152 activeRequest->responseStart(buffer);
153 state = STATE_GETTING_HEADERS;
155 if (activeRequest->responseCode() == 204) {
156 noMessageBody = true;
157 } else if (activeRequest->method() == "HEAD") {
158 noMessageBody = true;
160 noMessageBody = false;
163 bodyTransferSize = -1;
164 chunkedTransfer = false;
165 contentGZip = contentDeflate = false;
168 void tryStartNextRequest()
170 if (queuedRequests.empty()) {
175 if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
179 if (state == STATE_CLOSED) {
180 if (!connectToHost()) {
184 setTerminator("\r\n");
188 Request_ptr r = queuedRequests.front();
189 requestBodyBytesToSend = r->requestBodyLength();
191 stringstream headerData;
192 string path = r->path();
193 assert(!path.empty());
194 string query = r->query();
197 if (!client->proxyHost().empty()) {
198 path = r->scheme() + "://" + r->host() + r->path();
201 if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
202 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
203 bodyData = query.substr(1); // URL-encode, drop the leading '?'
204 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
205 headerData << "Content-Length:" << bodyData.size() << "\r\n";
207 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
208 if (requestBodyBytesToSend >= 0) {
209 headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
210 headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
214 headerData << "Host: " << r->hostAndPort() << "\r\n";
215 headerData << "User-Agent:" << client->userAgent() << "\r\n";
216 headerData << "Accept-Encoding: deflate, gzip\r\n";
217 if (!client->proxyAuth().empty()) {
218 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
221 BOOST_FOREACH(string h, r->requestHeaders()) {
222 headerData << h << ": " << r->header(h) << "\r\n";
225 headerData << "\r\n"; // final CRLF to terminate the headers
226 if (!bodyData.empty()) {
227 headerData << bodyData;
230 bool ok = push(headerData.str().c_str());
232 // we've over-stuffed the socket, give up for now, let things
233 // drain down before trying to start any more requests.
237 while (requestBodyBytesToSend > 0) {
240 r->getBodyData(buf, len);
242 requestBodyBytesToSend -= len;
243 if (!bufferSend(buf, len)) {
244 SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
245 state = STATE_SOCKET_ERROR;
249 SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
254 //std::cout << "did send request:" << r->url() << std::endl;
255 // successfully sent, remove from queue, and maybe send the next
256 queuedRequests.pop_front();
257 sentRequests.push_back(r);
259 // pipelining, let's maybe send the next request right away
260 tryStartNextRequest();
263 virtual void collectIncomingData(const char* s, int n)
266 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
267 if (contentGZip || contentDeflate) {
268 expandCompressedData(s, n);
270 activeRequest->processBodyBytes(s, n);
273 buffer += string(s, n);
278 void expandCompressedData(const char* s, int n)
280 int reqSize = n + zlib.avail_in;
281 if (reqSize > zlibInflateBufferSize) {
283 unsigned char* newBuf = (unsigned char*) malloc(reqSize);
284 memcpy(newBuf, zlib.next_in, zlib.avail_in);
285 memcpy(newBuf + zlib.avail_in, s, n);
286 free(zlibInflateBuffer);
287 zlibInflateBuffer = newBuf;
288 zlibInflateBufferSize = reqSize;
290 // important to use memmove here, since it's very likely
291 // the source and destination ranges overlap
292 memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
293 memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
296 zlib.next_in = (unsigned char*) zlibInflateBuffer;
297 zlib.avail_in = reqSize;
298 zlib.next_out = zlibOutputBuffer;
299 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
302 // we clear this down to contentDeflate once the GZip header has been seen
303 if (reqSize < GZIP_HEADER_SIZE) {
304 return; // need more header bytes
307 if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
308 (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
309 (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
311 return; // invalid GZip header
314 char flags = zlibInflateBuffer[3];
315 int gzipHeaderSize = GZIP_HEADER_SIZE;
316 if (flags & GZIP_HEADER_FEXTRA) {
318 if (reqSize < gzipHeaderSize) {
319 return; // need more header bytes
322 unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
323 if ( sgIsBigEndian() ) {
324 sgEndianSwap( &extraHeaderBytes );
327 gzipHeaderSize += extraHeaderBytes;
328 if (reqSize < gzipHeaderSize) {
329 return; // need more header bytes
333 if (flags & GZIP_HEADER_FNAME) {
335 while (gzipHeaderSize <= reqSize) {
336 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
337 break; // found terminating NULL character
342 if (flags & GZIP_HEADER_COMMENT) {
344 while (gzipHeaderSize <= reqSize) {
345 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
346 break; // found terminating NULL character
351 if (flags & GZIP_HEADER_CRC) {
355 if (reqSize < gzipHeaderSize) {
356 return; // need more header bytes
359 zlib.next_in += gzipHeaderSize;
360 zlib.avail_in = reqSize - gzipHeaderSize;
361 // now we've processed the GZip header, can decode as deflate
363 contentDeflate = true;
368 int result = inflate(&zlib, Z_NO_FLUSH);
369 if (result == Z_OK || result == Z_STREAM_END) {
372 SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
376 writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
377 } while ((writtenSize == 0) && (zlib.avail_in > 0));
379 if (writtenSize > 0) {
380 activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
384 virtual void foundTerminator(void)
392 case STATE_GETTING_HEADERS:
397 case STATE_GETTING_BODY:
401 case STATE_GETTING_CHUNKED:
402 processChunkHeader();
405 case STATE_GETTING_CHUNKED_BYTES:
406 setTerminator("\r\n");
407 state = STATE_GETTING_CHUNKED;
411 case STATE_GETTING_TRAILER:
421 bool hasIdleTimeout() const
423 if (state != STATE_IDLE) {
427 return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
430 bool hasErrorTimeout() const
432 if (state == STATE_IDLE) {
436 return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
439 bool hasError() const
441 return (state == STATE_SOCKET_ERROR);
444 bool shouldStartNext() const
446 return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
451 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
454 SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
458 if (connect(host.c_str(), port) != 0) {
468 string h = strutils::simplify(buffer);
469 if (h.empty()) { // blank line terminates headers
472 if (contentGZip || contentDeflate) {
473 memset(&zlib, 0, sizeof(z_stream));
474 if (!zlibOutputBuffer) {
475 zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
478 // NULLs means we'll get default alloc+free methods
479 // which is absolutely fine
480 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
481 zlib.next_out = zlibOutputBuffer;
482 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
483 SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
487 if (chunkedTransfer) {
488 state = STATE_GETTING_CHUNKED;
489 } else if (noMessageBody || (bodyTransferSize == 0)) {
490 // force the state to GETTING_BODY, to simplify logic in
491 // responseComplete and handleClose
492 state = STATE_GETTING_BODY;
495 setByteCount(bodyTransferSize); // may be -1, that's fine
496 state = STATE_GETTING_BODY;
502 int colonPos = buffer.find(':');
504 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
508 string key = strutils::simplify(buffer.substr(0, colonPos));
509 string lkey = boost::to_lower_copy(key);
510 string value = strutils::strip(buffer.substr(colonPos + 1));
512 // only consider these if getting headers (as opposed to trailers
513 // of a chunked transfer)
514 if (state == STATE_GETTING_HEADERS) {
515 if (lkey == "content-length") {
517 int sz = strutils::to_int(value);
518 if (bodyTransferSize <= 0) {
519 bodyTransferSize = sz;
521 activeRequest->setResponseLength(sz);
522 } else if (lkey == "transfer-length") {
523 bodyTransferSize = strutils::to_int(value);
524 } else if (lkey == "transfer-encoding") {
525 processTransferEncoding(value);
526 } else if (lkey == "content-encoding") {
527 if (value == "gzip") {
529 } else if (value == "deflate") {
530 contentDeflate = true;
531 } else if (value != "identity") {
532 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
537 activeRequest->responseHeader(lkey, value);
540 void processTransferEncoding(const string& te)
542 if (te == "chunked") {
543 chunkedTransfer = true;
545 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
550 void processChunkHeader()
552 if (buffer.empty()) {
553 // blank line after chunk data
558 int semiPos = buffer.find(';');
560 // extensions ignored for the moment
561 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
563 chunkSize = strutils::to_int(buffer, 16);
567 if (chunkSize == 0) { // trailer start
568 state = STATE_GETTING_TRAILER;
572 state = STATE_GETTING_CHUNKED_BYTES;
573 setByteCount(chunkSize);
576 void processTrailer()
578 if (buffer.empty()) {
584 // process as a normal header
588 void headersComplete()
590 activeRequest->responseHeadersComplete();
593 void responseComplete()
595 //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
596 activeRequest->responseComplete();
597 client->requestFinished(this);
599 if (contentDeflate) {
603 assert(sentRequests.front() == activeRequest);
604 sentRequests.pop_front();
605 bool doClose = activeRequest->closeAfterComplete();
606 activeRequest = NULL;
608 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
610 // this will bring us into handleClose() above, which updates
611 // state to STATE_CLOSED
614 // if we have additional requests waiting, try to start them now
615 tryStartNextRequest();
619 if (state != STATE_CLOSED) {
622 setTerminator("\r\n");
625 enum ConnectionState {
627 STATE_GETTING_HEADERS,
629 STATE_GETTING_CHUNKED,
630 STATE_GETTING_CHUNKED_BYTES,
631 STATE_GETTING_TRAILER,
633 STATE_CLOSED ///< connection should be closed now
637 Request_ptr activeRequest;
638 ConnectionState state;
642 int bodyTransferSize;
643 SGTimeStamp idleTime;
644 bool chunkedTransfer;
646 int requestBodyBytesToSend;
649 unsigned char* zlibInflateBuffer;
650 int zlibInflateBufferSize;
651 unsigned char* zlibOutputBuffer;
652 bool contentGZip, contentDeflate;
654 std::list<Request_ptr> queuedRequests;
655 std::list<Request_ptr> sentRequests;
660 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
663 void Client::update(int waitTimeout)
665 NetChannel::poll(waitTimeout);
667 ConnectionDict::iterator it = _connections.begin();
668 for (; it != _connections.end(); ) {
669 if (it->second->hasIdleTimeout() || it->second->hasError() ||
670 it->second->hasErrorTimeout())
672 // connection has been idle for a while, clean it up
673 // (or has an error condition, again clean it up)
674 ConnectionDict::iterator del = it++;
676 _connections.erase(del);
678 if (it->second->shouldStartNext()) {
679 it->second->tryStartNextRequest();
684 } // of connecion iteration
687 void Client::makeRequest(const Request_ptr& r)
689 string host = r->host();
690 int port = r->port();
691 if (!_proxy.empty()) {
697 ss << host << "-" << port;
698 string connectionId = ss.str();
700 if (_connections.find(connectionId) == _connections.end()) {
701 Connection* con = new Connection(this);
702 con->setServer(host, port);
703 _connections[connectionId] = con;
706 _connections[connectionId]->queueRequest(r);
709 void Client::requestFinished(Connection* con)
714 void Client::setUserAgent(const string& ua)
719 void Client::setProxy(const string& proxy, int port, const string& auth)
726 } // of namespace HTTP
728 } // of namespace simgear