]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
Ensure individual log-level setting works.
[simgear.git] / simgear / io / HTTPClient.cxx
1 #include "HTTPClient.hxx"
2
3 #include <sstream>
4 #include <cassert>
5 #include <list>
6 #include <iostream>
7 #include <errno.h>
8
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string/case_conv.hpp>
11
12 #include <zlib.h>
13
14 #include <simgear/io/sg_netChat.hxx>
15 #include <simgear/io/lowlevel.hxx>
16 #include <simgear/misc/strutils.hxx>
17 #include <simgear/compiler.h>
18 #include <simgear/debug/logstream.hxx>
19 #include <simgear/timing/timestamp.hxx>
20
21 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
22 #include "version.h"
23 #else
24 #  if !defined(SIMGEAR_VERSION)
25 #    define SIMGEAR_VERSION "simgear-development"
26 #  endif
27 #endif
28
29 using std::string;
30 using std::stringstream;
31 using std::vector;
32
33 namespace simgear
34 {
35
36 namespace HTTP
37 {
38
39 extern const int DEFAULT_HTTP_PORT = 80;
40 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
41 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
42 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
43 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
44   
45 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
46 // detailed description of the logic 
47 const int GZIP_HEADER_ID1 = 31;
48 const int GZIP_HEADER_ID2 = 139;
49 const int GZIP_HEADER_METHOD_DEFLATE = 8;
50 const unsigned int GZIP_HEADER_SIZE = 10;
51 const int GZIP_HEADER_FEXTRA = 1 << 2;
52 const int GZIP_HEADER_FNAME = 1 << 3;
53 const int GZIP_HEADER_COMMENT = 1 << 4;
54 const int GZIP_HEADER_CRC = 1 << 1;
55   
56 class Connection : public NetChat
57 {
58 public:
59     Connection(Client* pr) :
60         client(pr),
61         state(STATE_CLOSED),
62         port(DEFAULT_HTTP_PORT),
63         zlibInflateBuffer(NULL),
64         zlibInflateBufferSize(0),
65         zlibOutputBuffer(NULL)
66     {
67         
68     }
69     
70     virtual ~Connection()
71     {
72       if (zlibInflateBuffer) {
73         free(zlibInflateBuffer);
74       }
75       
76       if (zlibOutputBuffer) {
77         free(zlibOutputBuffer);
78       }
79     }
80   
81     void setServer(const string& h, short p)
82     {
83         host = h;
84         port = p;
85     }
86     
87     // socket-level errors
88     virtual void handleError(int error)
89     {
90         if (error == ENOENT) {
91         // name lookup failure
92             // we won't have an active request yet, so the logic below won't
93             // fire to actually call setFailure. Let's fail all of the requests
94             BOOST_FOREACH(Request_ptr req, sentRequests) {
95                 req->setFailure(error, "hostname lookup failure");
96             }
97             
98             BOOST_FOREACH(Request_ptr req, queuedRequests) {
99                 req->setFailure(error, "hostname lookup failure");
100             }
101             
102         // name lookup failure, abandon all requests on this connection
103             sentRequests.clear();
104             queuedRequests.clear();
105         }
106         
107         NetChat::handleError(error);
108         if (activeRequest) {            
109             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
110             activeRequest->setFailure(error, "socket error");
111             activeRequest = NULL;
112         }
113     
114         state = STATE_SOCKET_ERROR;
115     }
116     
117     virtual void handleClose()
118     {      
119         NetChat::handleClose();
120         
121         if ((state == STATE_GETTING_BODY) && activeRequest) {
122         // force state here, so responseComplete can avoid closing the 
123         // socket again
124             state =  STATE_CLOSED;
125             responseComplete();
126         } else {
127             state = STATE_CLOSED;
128         }
129       
130       if (sentRequests.empty()) {
131         return;
132       }
133       
134     // restore sent requests to the queue, so they will be re-sent
135     // when the connection opens again
136       queuedRequests.insert(queuedRequests.begin(),
137                               sentRequests.begin(), sentRequests.end());
138       sentRequests.clear();
139     }
140     
141     void queueRequest(const Request_ptr& r)
142     {
143       queuedRequests.push_back(r);
144       tryStartNextRequest();
145     }
146     
147     void beginResponse()
148     {
149       assert(!sentRequests.empty());
150       
151       activeRequest = sentRequests.front();      
152       activeRequest->responseStart(buffer);
153       state = STATE_GETTING_HEADERS;
154       buffer.clear();
155       if (activeRequest->responseCode() == 204) {
156         noMessageBody = true;
157       } else if (activeRequest->method() == "HEAD") {
158         noMessageBody = true;
159       } else {
160         noMessageBody = false;
161       }
162
163       bodyTransferSize = -1;
164       chunkedTransfer = false;
165       contentGZip = contentDeflate = false;
166     }
167   
168     void tryStartNextRequest()
169     {
170       if (queuedRequests.empty()) {
171         idleTime.stamp();
172         return;
173       }
174       
175       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
176         return;
177       }
178       
179       if (state == STATE_CLOSED) {
180           if (!connectToHost()) {
181               return;
182           }
183           
184           setTerminator("\r\n");
185           state = STATE_IDLE;
186       }
187      
188       Request_ptr r = queuedRequests.front();
189       requestBodyBytesToSend = r->requestBodyLength();
190           
191       stringstream headerData;
192       string path = r->path();
193       assert(!path.empty());
194       string query = r->query();
195       string bodyData;
196       
197       if (!client->proxyHost().empty()) {
198           path = r->scheme() + "://" + r->host() + r->path();
199       }
200
201       if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
202           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
203           bodyData = query.substr(1); // URL-encode, drop the leading '?'
204           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
205           headerData << "Content-Length:" << bodyData.size() << "\r\n";
206       } else {
207           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
208           if (requestBodyBytesToSend >= 0) {
209             headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
210             headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
211           }
212       }
213       
214       headerData << "Host: " << r->hostAndPort() << "\r\n";
215       headerData << "User-Agent:" << client->userAgent() << "\r\n";
216       headerData << "Accept-Encoding: deflate, gzip\r\n";
217       if (!client->proxyAuth().empty()) {
218           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
219       }
220
221       BOOST_FOREACH(string h, r->requestHeaders()) {
222           headerData << h << ": " << r->header(h) << "\r\n";
223       }
224
225       headerData << "\r\n"; // final CRLF to terminate the headers
226       if (!bodyData.empty()) {
227           headerData << bodyData;
228       }
229       
230       bool ok = push(headerData.str().c_str());
231       if (!ok) {
232           // we've over-stuffed the socket, give up for now, let things
233           // drain down before trying to start any more requests.
234           return;
235       }
236       
237       while (requestBodyBytesToSend > 0) {
238         char buf[4096];
239         int len = 4096;
240         r->getBodyData(buf, len);
241         if (len > 0) {
242           requestBodyBytesToSend -= len;
243           if (!bufferSend(buf, len)) {
244             SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
245             state = STATE_SOCKET_ERROR;
246             return;
247           }
248         } else {
249           SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
250           break;
251         }
252       }
253       
254       //std::cout << "did send request:" << r->url() << std::endl;
255     // successfully sent, remove from queue, and maybe send the next
256       queuedRequests.pop_front();
257       sentRequests.push_back(r);
258       
259     // pipelining, let's maybe send the next request right away
260       tryStartNextRequest();
261     }
262     
263     virtual void collectIncomingData(const char* s, int n)
264     {
265         idleTime.stamp();
266         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
267           if (contentGZip || contentDeflate) {
268             expandCompressedData(s, n);
269           } else {
270             activeRequest->processBodyBytes(s, n);
271           }
272         } else {
273             buffer += string(s, n);
274         }
275     }
276
277     
278     void expandCompressedData(const char* s, int n)
279     {
280       int reqSize = n + zlib.avail_in;
281       if (reqSize > zlibInflateBufferSize) {
282       // reallocate
283         unsigned char* newBuf = (unsigned char*) malloc(reqSize);
284         memcpy(newBuf, zlib.next_in, zlib.avail_in);
285         memcpy(newBuf + zlib.avail_in, s, n);
286         free(zlibInflateBuffer);
287         zlibInflateBuffer = newBuf;
288         zlibInflateBufferSize = reqSize;
289       } else {
290       // important to use memmove here, since it's very likely
291       // the source and destination ranges overlap
292         memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
293         memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
294       }
295             
296       zlib.next_in = (unsigned char*) zlibInflateBuffer;
297       zlib.avail_in = reqSize;
298       zlib.next_out = zlibOutputBuffer;
299       zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
300       
301       if (contentGZip && !handleGZipHeader()) {
302           return;
303       }
304       
305       int writtenSize = 0;
306       do {
307         int result = inflate(&zlib, Z_NO_FLUSH);
308         if (result == Z_OK || result == Z_STREAM_END) {
309             // nothing to do
310         } else {
311           SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
312           return;
313         }
314             
315         writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
316         if (result == Z_STREAM_END) {
317             break;
318         }
319       } while ((writtenSize == 0) && (zlib.avail_in > 0));
320     
321       if (writtenSize > 0) {
322         activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
323       }
324     }
325     
326     bool handleGZipHeader()
327     {
328         // we clear this down to contentDeflate once the GZip header has been seen
329         if (zlib.avail_in < GZIP_HEADER_SIZE) {
330           return false; // need more header bytes
331         }
332         
333         if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
334             (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
335             (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
336         {
337           return false; // invalid GZip header
338         }
339         
340         char flags = zlibInflateBuffer[3];
341         int gzipHeaderSize =  GZIP_HEADER_SIZE;
342         if (flags & GZIP_HEADER_FEXTRA) {
343           gzipHeaderSize += 2;
344           if (zlib.avail_in < gzipHeaderSize) {
345             return false; // need more header bytes
346           }
347           
348           unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
349           if ( sgIsBigEndian() ) {
350               sgEndianSwap( &extraHeaderBytes );
351           }
352           
353           gzipHeaderSize += extraHeaderBytes;
354           if (zlib.avail_in < gzipHeaderSize) {
355             return false; // need more header bytes
356           }
357         }
358         
359         if (flags & GZIP_HEADER_FNAME) {
360           gzipHeaderSize++;
361           while (gzipHeaderSize <= zlib.avail_in) {
362             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
363               break; // found terminating NULL character
364             }
365           }
366         }
367         
368         if (flags & GZIP_HEADER_COMMENT) {
369           gzipHeaderSize++;
370           while (gzipHeaderSize <= zlib.avail_in) {
371             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
372               break; // found terminating NULL character
373             }
374           }
375         }
376         
377         if (flags & GZIP_HEADER_CRC) {
378           gzipHeaderSize += 2;
379         }
380         
381         if (zlib.avail_in < gzipHeaderSize) {
382           return false; // need more header bytes
383         }
384         
385         zlib.next_in += gzipHeaderSize;
386         zlib.avail_in -= gzipHeaderSize;
387       // now we've processed the GZip header, can decode as deflate
388         contentGZip = false;
389         contentDeflate = true;
390         return true;
391     }
392     
393     virtual void foundTerminator(void)
394     {
395         idleTime.stamp();
396         switch (state) {
397         case STATE_IDLE:
398             beginResponse();
399             break;
400             
401         case STATE_GETTING_HEADERS:
402             processHeader();
403             buffer.clear();
404             break;
405             
406         case STATE_GETTING_BODY:
407             responseComplete();
408             break;
409         
410         case STATE_GETTING_CHUNKED:
411             processChunkHeader();
412             break;
413             
414         case STATE_GETTING_CHUNKED_BYTES:
415             setTerminator("\r\n");
416             state = STATE_GETTING_CHUNKED;
417             buffer.clear();
418             break;
419             
420
421         case STATE_GETTING_TRAILER:
422             processTrailer();
423             buffer.clear();
424             break;
425         
426         default:
427             break;
428         }
429     }
430     
431     bool hasIdleTimeout() const
432     {
433         if (state != STATE_IDLE) {
434             return false;
435         }
436         
437         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
438     }
439   
440     bool hasErrorTimeout() const
441     {
442       if (state == STATE_IDLE) {
443         return false;
444       }
445       
446       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
447     }
448     
449     bool hasError() const
450     {
451         return (state == STATE_SOCKET_ERROR);
452     }
453     
454     bool shouldStartNext() const
455     {
456       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
457     }
458 private:
459     bool connectToHost()
460     {
461         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
462         
463         if (!open()) {
464             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
465             return false;
466         }
467         
468         if (connect(host.c_str(), port) != 0) {
469             return false;
470         }
471         
472         return true;
473     }
474     
475     
476     void processHeader()
477     {
478         string h = strutils::simplify(buffer);
479         if (h.empty()) { // blank line terminates headers
480             headersComplete();
481             
482             if (contentGZip || contentDeflate) {
483                 memset(&zlib, 0, sizeof(z_stream));
484                 if (!zlibOutputBuffer) {
485                     zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
486                 }
487             
488                 // NULLs means we'll get default alloc+free methods
489                 // which is absolutely fine
490                 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
491                 zlib.next_out = zlibOutputBuffer;
492                 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
493                   SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
494                 }
495             }
496           
497             if (chunkedTransfer) {
498                 state = STATE_GETTING_CHUNKED;
499             } else if (noMessageBody || (bodyTransferSize == 0)) {
500                 // force the state to GETTING_BODY, to simplify logic in
501                 // responseComplete and handleClose
502                 state = STATE_GETTING_BODY;
503                 responseComplete();
504             } else {
505                 setByteCount(bodyTransferSize); // may be -1, that's fine              
506                 state = STATE_GETTING_BODY;
507             }
508             
509             return;
510         }
511               
512         int colonPos = buffer.find(':');
513         if (colonPos < 0) {
514             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
515             return;
516         }
517         
518         string key = strutils::simplify(buffer.substr(0, colonPos));
519         string lkey = boost::to_lower_copy(key);
520         string value = strutils::strip(buffer.substr(colonPos + 1));
521         
522         // only consider these if getting headers (as opposed to trailers 
523         // of a chunked transfer)
524         if (state == STATE_GETTING_HEADERS) {
525             if (lkey == "content-length") {
526
527                 int sz = strutils::to_int(value);
528                 if (bodyTransferSize <= 0) {
529                     bodyTransferSize = sz;
530                 }
531                 activeRequest->setResponseLength(sz);
532             } else if (lkey == "transfer-length") {
533                 bodyTransferSize = strutils::to_int(value);
534             } else if (lkey == "transfer-encoding") {
535                 processTransferEncoding(value);
536             } else if (lkey == "content-encoding") {
537               if (value == "gzip") {
538                 contentGZip = true;
539               } else if (value == "deflate") {
540                 contentDeflate = true;
541               } else if (value != "identity") {
542                 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
543               }
544             }
545         }
546     
547         activeRequest->responseHeader(lkey, value);
548     }
549     
550     void processTransferEncoding(const string& te)
551     {
552         if (te == "chunked") {
553             chunkedTransfer = true;
554         } else {
555             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
556             // failure
557         }
558     }
559     
560     void processChunkHeader()
561     {
562         if (buffer.empty()) {
563             // blank line after chunk data
564             return;
565         }
566                 
567         int chunkSize = 0;
568         int semiPos = buffer.find(';');
569         if (semiPos >= 0) {
570             // extensions ignored for the moment
571             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
572         } else {
573             chunkSize = strutils::to_int(buffer, 16);
574         }
575         
576         buffer.clear();
577         if (chunkSize == 0) {  //  trailer start
578             state = STATE_GETTING_TRAILER;
579             return;
580         }
581         
582         state = STATE_GETTING_CHUNKED_BYTES;
583         setByteCount(chunkSize);
584     }
585     
586     void processTrailer()
587     {        
588         if (buffer.empty()) {
589             // end of trailers
590             responseComplete();
591             return;
592         }
593         
594     // process as a normal header
595         processHeader();
596     }
597     
598     void headersComplete()
599     {
600         activeRequest->responseHeadersComplete();
601     }
602     
603     void responseComplete()
604     {
605       //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
606         activeRequest->responseComplete();
607         client->requestFinished(this);
608       
609         if (contentDeflate) {
610           inflateEnd(&zlib);
611         }
612       
613         assert(sentRequests.front() == activeRequest);
614         sentRequests.pop_front();
615         bool doClose = activeRequest->closeAfterComplete();
616         activeRequest = NULL;
617       
618         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
619             if (doClose) {
620           // this will bring us into handleClose() above, which updates
621           // state to STATE_CLOSED
622             close();
623               
624           // if we have additional requests waiting, try to start them now
625             tryStartNextRequest();
626           }
627         }
628         
629       if (state != STATE_CLOSED)  {
630         state = STATE_IDLE;
631       }
632       setTerminator("\r\n");
633     }
634     
635     enum ConnectionState {
636         STATE_IDLE = 0,
637         STATE_GETTING_HEADERS,
638         STATE_GETTING_BODY,
639         STATE_GETTING_CHUNKED,
640         STATE_GETTING_CHUNKED_BYTES,
641         STATE_GETTING_TRAILER,
642         STATE_SOCKET_ERROR,
643         STATE_CLOSED             ///< connection should be closed now
644     };
645     
646     Client* client;
647     Request_ptr activeRequest;
648     ConnectionState state;
649     string host;
650     short port;
651     std::string buffer;
652     int bodyTransferSize;
653     SGTimeStamp idleTime;
654     bool chunkedTransfer;
655     bool noMessageBody;
656     int requestBodyBytesToSend;
657   
658     z_stream zlib;
659     unsigned char* zlibInflateBuffer;
660     int zlibInflateBufferSize;
661     unsigned char* zlibOutputBuffer;
662     bool contentGZip, contentDeflate;
663   
664     std::list<Request_ptr> queuedRequests;
665     std::list<Request_ptr> sentRequests;
666 };
667
668 Client::Client()
669 {
670     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
671 }
672
673 void Client::update(int waitTimeout)
674 {
675     NetChannel::poll(waitTimeout);
676         
677     ConnectionDict::iterator it = _connections.begin();
678     for (; it != _connections.end(); ) {
679         if (it->second->hasIdleTimeout() || it->second->hasError() ||
680             it->second->hasErrorTimeout())
681         {
682         // connection has been idle for a while, clean it up
683         // (or has an error condition, again clean it up)
684             ConnectionDict::iterator del = it++;
685             delete del->second;
686             _connections.erase(del);
687         } else {
688             if (it->second->shouldStartNext()) {
689                 it->second->tryStartNextRequest();
690             }
691             
692             ++it;
693         }
694     } // of connecion iteration
695 }
696
697 void Client::makeRequest(const Request_ptr& r)
698 {
699     string host = r->host();
700     int port = r->port();
701     if (!_proxy.empty()) {
702         host = _proxy;
703         port = _proxyPort;
704     }
705     
706     stringstream ss;
707     ss << host << "-" << port;
708     string connectionId = ss.str();
709     
710     if (_connections.find(connectionId) == _connections.end()) {
711         Connection* con = new Connection(this);
712         con->setServer(host, port);
713         _connections[connectionId] = con;
714     }
715     
716     _connections[connectionId]->queueRequest(r);
717 }
718
719 void Client::requestFinished(Connection* con)
720 {
721     
722 }
723
724 void Client::setUserAgent(const string& ua)
725 {
726     _userAgent = ua;
727 }
728
729 void Client::setProxy(const string& proxy, int port, const string& auth)
730 {
731     _proxy = proxy;
732     _proxyPort = port;
733     _proxyAuth = auth;
734 }
735
736 } // of namespace HTTP
737
738 } // of namespace simgear