1 #include "HTTPClient.hxx"
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string/case_conv.hpp>
14 #include <simgear/io/sg_netChat.hxx>
15 #include <simgear/io/lowlevel.hxx>
16 #include <simgear/misc/strutils.hxx>
17 #include <simgear/compiler.h>
18 #include <simgear/debug/logstream.hxx>
19 #include <simgear/timing/timestamp.hxx>
21 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
24 # if !defined(SIMGEAR_VERSION)
25 # define SIMGEAR_VERSION "simgear-development"
30 using std::stringstream;
39 extern const int DEFAULT_HTTP_PORT = 80;
40 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
41 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
42 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
43 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
45 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
46 // detailed description of the logic
47 const int GZIP_HEADER_ID1 = 31;
48 const int GZIP_HEADER_ID2 = 139;
49 const int GZIP_HEADER_METHOD_DEFLATE = 8;
50 const unsigned int GZIP_HEADER_SIZE = 10;
51 const int GZIP_HEADER_FEXTRA = 1 << 2;
52 const int GZIP_HEADER_FNAME = 1 << 3;
53 const int GZIP_HEADER_COMMENT = 1 << 4;
54 const int GZIP_HEADER_CRC = 1 << 1;
56 typedef std::list<Request_ptr> RequestList;
58 class Connection : public NetChat
61 Connection(Client* pr) :
64 port(DEFAULT_HTTP_PORT),
65 zlibInflateBuffer(NULL),
66 zlibInflateBufferSize(0),
67 zlibOutputBuffer(NULL)
74 if (zlibInflateBuffer) {
75 free(zlibInflateBuffer);
78 if (zlibOutputBuffer) {
79 free(zlibOutputBuffer);
83 void setServer(const string& h, short p)
89 // socket-level errors
90 virtual void handleError(int error)
92 if (error == ENOENT) {
93 // name lookup failure
94 // we won't have an active request yet, so the logic below won't
95 // fire to actually call setFailure. Let's fail all of the requests
96 BOOST_FOREACH(Request_ptr req, sentRequests) {
97 req->setFailure(error, "hostname lookup failure");
100 BOOST_FOREACH(Request_ptr req, queuedRequests) {
101 req->setFailure(error, "hostname lookup failure");
104 // name lookup failure, abandon all requests on this connection
105 sentRequests.clear();
106 queuedRequests.clear();
109 NetChat::handleError(error);
111 SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
112 activeRequest->setFailure(error, "socket error");
113 activeRequest = NULL;
116 state = STATE_SOCKET_ERROR;
119 virtual void handleClose()
121 NetChat::handleClose();
123 // closing of the connection from the server side when getting the body,
124 bool canCloseState = (state == STATE_GETTING_BODY);
125 if (canCloseState && activeRequest) {
126 // force state here, so responseComplete can avoid closing the
128 state = STATE_CLOSED;
132 activeRequest->setFailure(500, "server closed connection");
133 // remove the failed request from sentRequests, so it does
135 RequestList::iterator it = std::find(sentRequests.begin(),
136 sentRequests.end(), activeRequest);
137 if (it != sentRequests.end()) {
138 sentRequests.erase(it);
140 activeRequest = NULL;
143 state = STATE_CLOSED;
146 if (sentRequests.empty()) {
150 // restore sent requests to the queue, so they will be re-sent
151 // when the connection opens again
152 queuedRequests.insert(queuedRequests.begin(),
153 sentRequests.begin(), sentRequests.end());
154 sentRequests.clear();
157 void queueRequest(const Request_ptr& r)
159 queuedRequests.push_back(r);
160 tryStartNextRequest();
165 assert(!sentRequests.empty());
167 activeRequest = sentRequests.front();
168 activeRequest->responseStart(buffer);
169 state = STATE_GETTING_HEADERS;
171 if (activeRequest->responseCode() == 204) {
172 noMessageBody = true;
173 } else if (activeRequest->method() == "HEAD") {
174 noMessageBody = true;
176 noMessageBody = false;
179 bodyTransferSize = -1;
180 chunkedTransfer = false;
181 contentGZip = contentDeflate = false;
184 void tryStartNextRequest()
186 if (queuedRequests.empty()) {
191 if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
195 if (state == STATE_CLOSED) {
196 if (!connectToHost()) {
200 setTerminator("\r\n");
204 Request_ptr r = queuedRequests.front();
206 requestBodyBytesToSend = r->requestBodyLength();
208 stringstream headerData;
209 string path = r->path();
210 assert(!path.empty());
211 string query = r->query();
214 if (!client->proxyHost().empty()) {
215 path = r->scheme() + "://" + r->host() + r->path();
218 if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
219 headerData << r->method() << " " << path << " HTTP/1.1\r\n";
220 bodyData = query.substr(1); // URL-encode, drop the leading '?'
221 headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
222 headerData << "Content-Length:" << bodyData.size() << "\r\n";
224 headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
225 if (requestBodyBytesToSend >= 0) {
226 headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
227 headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
231 headerData << "Host: " << r->hostAndPort() << "\r\n";
232 headerData << "User-Agent:" << client->userAgent() << "\r\n";
233 headerData << "Accept-Encoding: deflate, gzip\r\n";
234 if (!client->proxyAuth().empty()) {
235 headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
238 BOOST_FOREACH(string h, r->requestHeaders()) {
239 headerData << h << ": " << r->header(h) << "\r\n";
242 headerData << "\r\n"; // final CRLF to terminate the headers
243 if (!bodyData.empty()) {
244 headerData << bodyData;
247 bool ok = push(headerData.str().c_str());
249 SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
250 // we've over-stuffed the socket, give up for now, let things
251 // drain down before trying to start any more requests.
255 while (requestBodyBytesToSend > 0) {
257 int len = r->getBodyData(buf, 4096);
259 requestBodyBytesToSend -= len;
260 if (!bufferSend(buf, len)) {
261 SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
262 state = STATE_SOCKET_ERROR;
265 // SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
267 SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
272 SG_LOG(SG_IO, SG_DEBUG, "did start request:" << r->url() <<
273 "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
274 "\n\t on connection " << this);
275 // successfully sent, remove from queue, and maybe send the next
276 queuedRequests.pop_front();
277 sentRequests.push_back(r);
279 // pipelining, let's maybe send the next request right away
280 tryStartNextRequest();
283 virtual void collectIncomingData(const char* s, int n)
286 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
287 if (contentGZip || contentDeflate) {
288 expandCompressedData(s, n);
290 activeRequest->processBodyBytes(s, n);
293 buffer += string(s, n);
298 void expandCompressedData(const char* s, int n)
300 int reqSize = n + zlib.avail_in;
301 if (reqSize > zlibInflateBufferSize) {
303 unsigned char* newBuf = (unsigned char*) malloc(reqSize);
304 memcpy(newBuf, zlib.next_in, zlib.avail_in);
305 memcpy(newBuf + zlib.avail_in, s, n);
306 free(zlibInflateBuffer);
307 zlibInflateBuffer = newBuf;
308 zlibInflateBufferSize = reqSize;
310 // important to use memmove here, since it's very likely
311 // the source and destination ranges overlap
312 memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
313 memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
316 zlib.next_in = (unsigned char*) zlibInflateBuffer;
317 zlib.avail_in = reqSize;
318 zlib.next_out = zlibOutputBuffer;
319 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
321 if (contentGZip && !handleGZipHeader()) {
327 int result = inflate(&zlib, Z_NO_FLUSH);
328 if (result == Z_OK || result == Z_STREAM_END) {
331 SG_LOG(SG_IO, SG_WARN, "HTTP: got Zlib error:" << result);
335 writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
336 if (result == Z_STREAM_END) {
339 } while ((writtenSize == 0) && (zlib.avail_in > 0));
341 if (writtenSize > 0) {
342 activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
346 bool handleGZipHeader()
348 // we clear this down to contentDeflate once the GZip header has been seen
349 if (zlib.avail_in < GZIP_HEADER_SIZE) {
350 return false; // need more header bytes
353 if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
354 (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
355 (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
357 return false; // invalid GZip header
360 char flags = zlibInflateBuffer[3];
361 unsigned int gzipHeaderSize = GZIP_HEADER_SIZE;
362 if (flags & GZIP_HEADER_FEXTRA) {
364 if (zlib.avail_in < gzipHeaderSize) {
365 return false; // need more header bytes
368 unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
369 if ( sgIsBigEndian() ) {
370 sgEndianSwap( &extraHeaderBytes );
373 gzipHeaderSize += extraHeaderBytes;
374 if (zlib.avail_in < gzipHeaderSize) {
375 return false; // need more header bytes
379 if (flags & GZIP_HEADER_FNAME) {
381 while (gzipHeaderSize <= zlib.avail_in) {
382 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
383 break; // found terminating NULL character
388 if (flags & GZIP_HEADER_COMMENT) {
390 while (gzipHeaderSize <= zlib.avail_in) {
391 if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
392 break; // found terminating NULL character
397 if (flags & GZIP_HEADER_CRC) {
401 if (zlib.avail_in < gzipHeaderSize) {
402 return false; // need more header bytes
405 zlib.next_in += gzipHeaderSize;
406 zlib.avail_in -= gzipHeaderSize;
407 // now we've processed the GZip header, can decode as deflate
409 contentDeflate = true;
413 virtual void foundTerminator(void)
421 case STATE_GETTING_HEADERS:
426 case STATE_GETTING_BODY:
430 case STATE_GETTING_CHUNKED:
431 processChunkHeader();
434 case STATE_GETTING_CHUNKED_BYTES:
435 setTerminator("\r\n");
436 state = STATE_GETTING_CHUNKED;
441 case STATE_GETTING_TRAILER:
451 bool hasIdleTimeout() const
453 if (state != STATE_IDLE) {
457 return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
460 bool hasErrorTimeout() const
462 if (state == STATE_IDLE) {
466 return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
469 bool hasError() const
471 return (state == STATE_SOCKET_ERROR);
474 bool shouldStartNext() const
476 return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
479 bool isActive() const
481 return !queuedRequests.empty() || !sentRequests.empty();
486 SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
489 SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
493 if (connect(host.c_str(), port) != 0) {
503 string h = strutils::simplify(buffer);
504 if (h.empty()) { // blank line terminates headers
507 if (contentGZip || contentDeflate) {
508 memset(&zlib, 0, sizeof(z_stream));
509 if (!zlibOutputBuffer) {
510 zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
513 // NULLs means we'll get default alloc+free methods
514 // which is absolutely fine
515 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
516 zlib.next_out = zlibOutputBuffer;
517 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
518 SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
522 if (chunkedTransfer) {
523 state = STATE_GETTING_CHUNKED;
524 } else if (noMessageBody || (bodyTransferSize == 0)) {
525 // force the state to GETTING_BODY, to simplify logic in
526 // responseComplete and handleClose
527 state = STATE_GETTING_BODY;
530 setByteCount(bodyTransferSize); // may be -1, that's fine
531 state = STATE_GETTING_BODY;
537 int colonPos = buffer.find(':');
539 SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
543 string key = strutils::simplify(buffer.substr(0, colonPos));
544 string lkey = boost::to_lower_copy(key);
545 string value = strutils::strip(buffer.substr(colonPos + 1));
547 // only consider these if getting headers (as opposed to trailers
548 // of a chunked transfer)
549 if (state == STATE_GETTING_HEADERS) {
550 if (lkey == "content-length") {
552 int sz = strutils::to_int(value);
553 if (bodyTransferSize <= 0) {
554 bodyTransferSize = sz;
556 activeRequest->setResponseLength(sz);
557 } else if (lkey == "transfer-length") {
558 bodyTransferSize = strutils::to_int(value);
559 } else if (lkey == "transfer-encoding") {
560 processTransferEncoding(value);
561 } else if (lkey == "content-encoding") {
562 if (value == "gzip") {
564 } else if (value == "deflate") {
565 contentDeflate = true;
566 } else if (value != "identity") {
567 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
572 activeRequest->responseHeader(lkey, value);
575 void processTransferEncoding(const string& te)
577 if (te == "chunked") {
578 chunkedTransfer = true;
580 SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
585 void processChunkHeader()
587 if (buffer.empty()) {
588 // blank line after chunk data
593 int semiPos = buffer.find(';');
595 // extensions ignored for the moment
596 chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
598 chunkSize = strutils::to_int(buffer, 16);
602 if (chunkSize == 0) { // trailer start
603 state = STATE_GETTING_TRAILER;
607 state = STATE_GETTING_CHUNKED_BYTES;
608 setByteCount(chunkSize);
611 void processTrailer()
613 if (buffer.empty()) {
619 // process as a normal header
623 void headersComplete()
625 activeRequest->responseHeadersComplete();
628 void responseComplete()
630 // SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
631 activeRequest->responseComplete();
632 client->requestFinished(this);
634 if (contentDeflate) {
638 assert(sentRequests.front() == activeRequest);
639 sentRequests.pop_front();
640 bool doClose = activeRequest->closeAfterComplete();
641 activeRequest = NULL;
643 if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
645 // this will bring us into handleClose() above, which updates
646 // state to STATE_CLOSED
649 // if we have additional requests waiting, try to start them now
650 tryStartNextRequest();
654 if (state != STATE_CLOSED) {
657 setTerminator("\r\n");
660 enum ConnectionState {
662 STATE_GETTING_HEADERS,
664 STATE_GETTING_CHUNKED,
665 STATE_GETTING_CHUNKED_BYTES,
666 STATE_GETTING_TRAILER,
668 STATE_CLOSED ///< connection should be closed now
672 Request_ptr activeRequest;
673 ConnectionState state;
677 int bodyTransferSize;
678 SGTimeStamp idleTime;
679 bool chunkedTransfer;
681 int requestBodyBytesToSend;
684 unsigned char* zlibInflateBuffer;
685 int zlibInflateBufferSize;
686 unsigned char* zlibOutputBuffer;
687 bool contentGZip, contentDeflate;
689 RequestList queuedRequests;
690 RequestList sentRequests;
695 setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
698 void Client::update(int waitTimeout)
700 _poller.poll(waitTimeout);
702 ConnectionDict::iterator it = _connections.begin();
703 for (; it != _connections.end(); ) {
704 if (it->second->hasIdleTimeout() || it->second->hasError() ||
705 it->second->hasErrorTimeout())
707 // connection has been idle for a while, clean it up
708 // (or has an error condition, again clean it up)
709 ConnectionDict::iterator del = it++;
711 _connections.erase(del);
713 if (it->second->shouldStartNext()) {
714 it->second->tryStartNextRequest();
719 } // of connecion iteration
722 void Client::makeRequest(const Request_ptr& r)
724 string host = r->host();
725 int port = r->port();
726 if (!_proxy.empty()) {
732 ss << host << "-" << port;
733 string connectionId = ss.str();
735 if (_connections.find(connectionId) == _connections.end()) {
736 Connection* con = new Connection(this);
737 con->setServer(host, port);
738 _poller.addChannel(con);
739 _connections[connectionId] = con;
742 _connections[connectionId]->queueRequest(r);
745 void Client::requestFinished(Connection* con)
750 void Client::setUserAgent(const string& ua)
755 void Client::setProxy(const string& proxy, int port, const string& auth)
762 bool Client::hasActiveRequests() const
764 ConnectionDict::const_iterator it = _connections.begin();
765 for (; it != _connections.end(); ++it) {
766 if (it->second->isActive()) return true;
772 } // of namespace HTTP
774 } // of namespace simgear