]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
ba68a94a4b0f60947cee8a47f6ff3c949b8e54d7
[simgear.git] / simgear / io / HTTPClient.cxx
1 #include "HTTPClient.hxx"
2
3 #include <sstream>
4 #include <cassert>
5 #include <list>
6 #include <iostream>
7 #include <errno.h>
8
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string/case_conv.hpp>
11
12 #include <zlib.h>
13
14 #include <simgear/io/sg_netChat.hxx>
15 #include <simgear/io/lowlevel.hxx>
16 #include <simgear/misc/strutils.hxx>
17 #include <simgear/compiler.h>
18 #include <simgear/debug/logstream.hxx>
19 #include <simgear/timing/timestamp.hxx>
20
21 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
22 #include "version.h"
23 #else
24 #  if !defined(SIMGEAR_VERSION)
25 #    define SIMGEAR_VERSION "simgear-development"
26 #  endif
27 #endif
28
29 using std::string;
30 using std::stringstream;
31 using std::vector;
32
33 namespace simgear
34 {
35
36 namespace HTTP
37 {
38
39 extern const int DEFAULT_HTTP_PORT = 80;
40 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
41 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
42 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
43 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
44   
45 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
46 // detailed description of the logic 
47 const int GZIP_HEADER_ID1 = 31;
48 const int GZIP_HEADER_ID2 = 139;
49 const int GZIP_HEADER_METHOD_DEFLATE = 8;
50 const unsigned int GZIP_HEADER_SIZE = 10;
51 const int GZIP_HEADER_FEXTRA = 1 << 2;
52 const int GZIP_HEADER_FNAME = 1 << 3;
53 const int GZIP_HEADER_COMMENT = 1 << 4;
54 const int GZIP_HEADER_CRC = 1 << 1;
55   
56 typedef std::list<Request_ptr> RequestList;
57   
58 class Connection : public NetChat
59 {
60 public:
61     Connection(Client* pr) :
62         client(pr),
63         state(STATE_CLOSED),
64         port(DEFAULT_HTTP_PORT),
65         zlibInflateBuffer(NULL),
66         zlibInflateBufferSize(0),
67         zlibOutputBuffer(NULL)
68     {
69         
70     }
71     
72     virtual ~Connection()
73     {
74       if (zlibInflateBuffer) {
75         free(zlibInflateBuffer);
76       }
77       
78       if (zlibOutputBuffer) {
79         free(zlibOutputBuffer);
80       }
81     }
82   
83     void setServer(const string& h, short p)
84     {
85         host = h;
86         port = p;
87     }
88     
89     // socket-level errors
90     virtual void handleError(int error)
91     {
92         if (error == ENOENT) {
93         // name lookup failure
94             // we won't have an active request yet, so the logic below won't
95             // fire to actually call setFailure. Let's fail all of the requests
96             BOOST_FOREACH(Request_ptr req, sentRequests) {
97                 req->setFailure(error, "hostname lookup failure");
98             }
99             
100             BOOST_FOREACH(Request_ptr req, queuedRequests) {
101                 req->setFailure(error, "hostname lookup failure");
102             }
103             
104         // name lookup failure, abandon all requests on this connection
105             sentRequests.clear();
106             queuedRequests.clear();
107         }
108         
109         NetChat::handleError(error);
110         if (activeRequest) {            
111             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
112             activeRequest->setFailure(error, "socket error");
113             activeRequest = NULL;
114         }
115     
116         state = STATE_SOCKET_ERROR;
117     }
118     
119     virtual void handleClose()
120     {      
121         NetChat::handleClose();
122
123     // closing of the connection from the server side when getting the body,
124         bool canCloseState = (state == STATE_GETTING_BODY);
125         if (canCloseState && activeRequest) {
126         // force state here, so responseComplete can avoid closing the 
127         // socket again
128             state =  STATE_CLOSED;
129             responseComplete();
130         } else {
131             if (activeRequest) {
132                 activeRequest->setFailure(500, "server closed connection");
133                 // remove the failed request from sentRequests, so it does 
134                 // not get restored
135                 RequestList::iterator it = std::find(sentRequests.begin(), 
136                     sentRequests.end(), activeRequest);
137                 if (it != sentRequests.end()) {
138                     sentRequests.erase(it);
139                 }
140                 activeRequest = NULL;
141             }
142             
143             state = STATE_CLOSED;
144         }
145       
146       if (sentRequests.empty()) {
147         return;
148       }
149       
150     // restore sent requests to the queue, so they will be re-sent
151     // when the connection opens again
152       queuedRequests.insert(queuedRequests.begin(),
153                               sentRequests.begin(), sentRequests.end());
154       sentRequests.clear();
155     }
156     
157     void queueRequest(const Request_ptr& r)
158     {
159       queuedRequests.push_back(r);
160       tryStartNextRequest();
161     }
162     
163     void beginResponse()
164     {
165       assert(!sentRequests.empty());
166       
167       activeRequest = sentRequests.front();      
168       activeRequest->responseStart(buffer);
169       state = STATE_GETTING_HEADERS;
170       buffer.clear();
171       if (activeRequest->responseCode() == 204) {
172         noMessageBody = true;
173       } else if (activeRequest->method() == "HEAD") {
174         noMessageBody = true;
175       } else {
176         noMessageBody = false;
177       }
178
179       bodyTransferSize = -1;
180       chunkedTransfer = false;
181       contentGZip = contentDeflate = false;
182     }
183   
184     void tryStartNextRequest()
185     {
186       if (queuedRequests.empty()) {
187         idleTime.stamp();
188         return;
189       }
190       
191       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
192         return;
193       }
194       
195       if (state == STATE_CLOSED) {
196           if (!connectToHost()) {
197               return;
198           }
199           
200           setTerminator("\r\n");
201           state = STATE_IDLE;
202       }
203      
204       Request_ptr r = queuedRequests.front();
205       r->requestStart();
206       requestBodyBytesToSend = r->requestBodyLength();
207           
208       stringstream headerData;
209       string path = r->path();
210       assert(!path.empty());
211       string query = r->query();
212       string bodyData;
213       
214       if (!client->proxyHost().empty()) {
215           path = r->scheme() + "://" + r->host() + r->path();
216       }
217
218       if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
219           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
220           bodyData = query.substr(1); // URL-encode, drop the leading '?'
221           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
222           headerData << "Content-Length:" << bodyData.size() << "\r\n";
223       } else {
224           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
225           if (requestBodyBytesToSend >= 0) {
226             headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
227             headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
228           }
229       }
230       
231       headerData << "Host: " << r->hostAndPort() << "\r\n";
232       headerData << "User-Agent:" << client->userAgent() << "\r\n";
233       headerData << "Accept-Encoding: deflate, gzip\r\n";
234       if (!client->proxyAuth().empty()) {
235           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
236       }
237
238       BOOST_FOREACH(string h, r->requestHeaders()) {
239           headerData << h << ": " << r->header(h) << "\r\n";
240       }
241
242       headerData << "\r\n"; // final CRLF to terminate the headers
243       if (!bodyData.empty()) {
244           headerData << bodyData;
245       }
246       
247       bool ok = push(headerData.str().c_str());
248       if (!ok) {
249           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
250           // we've over-stuffed the socket, give up for now, let things
251           // drain down before trying to start any more requests.
252           return;
253       }
254       
255       while (requestBodyBytesToSend > 0) {
256         char buf[4096];
257         int len = r->getBodyData(buf, 4096);
258         if (len > 0) {
259           requestBodyBytesToSend -= len;
260           if (!bufferSend(buf, len)) {
261             SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
262             state = STATE_SOCKET_ERROR;
263             return;
264           }
265       //    SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
266         } else {
267           SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
268           break;
269         }
270       }
271       
272       SG_LOG(SG_IO, SG_DEBUG, "did start request:" << r->url() <<
273           "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
274           "\n\t on connection " << this);
275     // successfully sent, remove from queue, and maybe send the next
276       queuedRequests.pop_front();
277       sentRequests.push_back(r);
278       
279     // pipelining, let's maybe send the next request right away
280       tryStartNextRequest();
281     }
282     
283     virtual void collectIncomingData(const char* s, int n)
284     {
285         idleTime.stamp();
286         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
287           if (contentGZip || contentDeflate) {
288             expandCompressedData(s, n);
289           } else {
290             activeRequest->processBodyBytes(s, n);
291           }
292         } else {
293             buffer += string(s, n);
294         }
295     }
296
297     
298     void expandCompressedData(const char* s, int n)
299     {
300       int reqSize = n + zlib.avail_in;
301       if (reqSize > zlibInflateBufferSize) {
302       // reallocate
303         unsigned char* newBuf = (unsigned char*) malloc(reqSize);
304         memcpy(newBuf, zlib.next_in, zlib.avail_in);
305         memcpy(newBuf + zlib.avail_in, s, n);
306         free(zlibInflateBuffer);
307         zlibInflateBuffer = newBuf;
308         zlibInflateBufferSize = reqSize;
309       } else {
310       // important to use memmove here, since it's very likely
311       // the source and destination ranges overlap
312         memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
313         memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
314       }
315             
316       zlib.next_in = (unsigned char*) zlibInflateBuffer;
317       zlib.avail_in = reqSize;
318       zlib.next_out = zlibOutputBuffer;
319       zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
320       
321       if (contentGZip && !handleGZipHeader()) {
322           return;
323       }
324       
325       int writtenSize = 0;
326       do {
327         int result = inflate(&zlib, Z_NO_FLUSH);
328         if (result == Z_OK || result == Z_STREAM_END) {
329             // nothing to do
330         } else {
331           SG_LOG(SG_IO, SG_WARN, "HTTP: got Zlib error:" << result);
332           return;
333         }
334             
335         writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
336         if (result == Z_STREAM_END) {
337             break;
338         }
339       } while ((writtenSize == 0) && (zlib.avail_in > 0));
340     
341       if (writtenSize > 0) {
342         activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
343       }
344     }
345     
346     bool handleGZipHeader()
347     {
348         // we clear this down to contentDeflate once the GZip header has been seen
349         if (zlib.avail_in < GZIP_HEADER_SIZE) {
350           return false; // need more header bytes
351         }
352         
353         if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
354             (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
355             (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
356         {
357           return false; // invalid GZip header
358         }
359         
360         char flags = zlibInflateBuffer[3];
361         unsigned int gzipHeaderSize =  GZIP_HEADER_SIZE;
362         if (flags & GZIP_HEADER_FEXTRA) {
363           gzipHeaderSize += 2;
364           if (zlib.avail_in < gzipHeaderSize) {
365             return false; // need more header bytes
366           }
367           
368           unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
369           if ( sgIsBigEndian() ) {
370               sgEndianSwap( &extraHeaderBytes );
371           }
372           
373           gzipHeaderSize += extraHeaderBytes;
374           if (zlib.avail_in < gzipHeaderSize) {
375             return false; // need more header bytes
376           }
377         }
378         
379         if (flags & GZIP_HEADER_FNAME) {
380           gzipHeaderSize++;
381           while (gzipHeaderSize <= zlib.avail_in) {
382             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
383               break; // found terminating NULL character
384             }
385           }
386         }
387         
388         if (flags & GZIP_HEADER_COMMENT) {
389           gzipHeaderSize++;
390           while (gzipHeaderSize <= zlib.avail_in) {
391             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
392               break; // found terminating NULL character
393             }
394           }
395         }
396         
397         if (flags & GZIP_HEADER_CRC) {
398           gzipHeaderSize += 2;
399         }
400         
401         if (zlib.avail_in < gzipHeaderSize) {
402           return false; // need more header bytes
403         }
404         
405         zlib.next_in += gzipHeaderSize;
406         zlib.avail_in -= gzipHeaderSize;
407       // now we've processed the GZip header, can decode as deflate
408         contentGZip = false;
409         contentDeflate = true;
410         return true;
411     }
412     
413     virtual void foundTerminator(void)
414     {
415         idleTime.stamp();
416         switch (state) {
417         case STATE_IDLE:
418             beginResponse();
419             break;
420             
421         case STATE_GETTING_HEADERS:
422             processHeader();
423             buffer.clear();
424             break;
425             
426         case STATE_GETTING_BODY:
427             responseComplete();
428             break;
429         
430         case STATE_GETTING_CHUNKED:
431             processChunkHeader();
432             break;
433             
434         case STATE_GETTING_CHUNKED_BYTES:
435             setTerminator("\r\n");
436             state = STATE_GETTING_CHUNKED;
437             buffer.clear();
438             break;
439             
440
441         case STATE_GETTING_TRAILER:
442             processTrailer();
443             buffer.clear();
444             break;
445         
446         default:
447             break;
448         }
449     }
450     
451     bool hasIdleTimeout() const
452     {
453         if (state != STATE_IDLE) {
454             return false;
455         }
456         
457         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
458     }
459   
460     bool hasErrorTimeout() const
461     {
462       if (state == STATE_IDLE) {
463         return false;
464       }
465       
466       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
467     }
468     
469     bool hasError() const
470     {
471         return (state == STATE_SOCKET_ERROR);
472     }
473     
474     bool shouldStartNext() const
475     {
476       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
477     }
478     
479     bool isActive() const
480     {
481         return !queuedRequests.empty() || !sentRequests.empty();
482     }
483 private:
484     bool connectToHost()
485     {
486         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
487         
488         if (!open()) {
489             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
490             return false;
491         }
492         
493         if (connect(host.c_str(), port) != 0) {
494             return false;
495         }
496         
497         return true;
498     }
499     
500     
501     void processHeader()
502     {
503         string h = strutils::simplify(buffer);
504         if (h.empty()) { // blank line terminates headers
505             headersComplete();
506             
507             if (contentGZip || contentDeflate) {
508                 memset(&zlib, 0, sizeof(z_stream));
509                 if (!zlibOutputBuffer) {
510                     zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
511                 }
512             
513                 // NULLs means we'll get default alloc+free methods
514                 // which is absolutely fine
515                 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
516                 zlib.next_out = zlibOutputBuffer;
517                 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
518                   SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
519                 }
520             }
521           
522             if (chunkedTransfer) {
523                 state = STATE_GETTING_CHUNKED;
524             } else if (noMessageBody || (bodyTransferSize == 0)) {
525                 // force the state to GETTING_BODY, to simplify logic in
526                 // responseComplete and handleClose
527                 state = STATE_GETTING_BODY;
528                 responseComplete();
529             } else {
530                 setByteCount(bodyTransferSize); // may be -1, that's fine              
531                 state = STATE_GETTING_BODY;
532             }
533             
534             return;
535         }
536               
537         int colonPos = buffer.find(':');
538         if (colonPos < 0) {
539             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
540             return;
541         }
542         
543         string key = strutils::simplify(buffer.substr(0, colonPos));
544         string lkey = boost::to_lower_copy(key);
545         string value = strutils::strip(buffer.substr(colonPos + 1));
546         
547         // only consider these if getting headers (as opposed to trailers 
548         // of a chunked transfer)
549         if (state == STATE_GETTING_HEADERS) {
550             if (lkey == "content-length") {
551
552                 int sz = strutils::to_int(value);
553                 if (bodyTransferSize <= 0) {
554                     bodyTransferSize = sz;
555                 }
556                 activeRequest->setResponseLength(sz);
557             } else if (lkey == "transfer-length") {
558                 bodyTransferSize = strutils::to_int(value);
559             } else if (lkey == "transfer-encoding") {
560                 processTransferEncoding(value);
561             } else if (lkey == "content-encoding") {
562               if (value == "gzip") {
563                 contentGZip = true;
564               } else if (value == "deflate") {
565                 contentDeflate = true;
566               } else if (value != "identity") {
567                 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
568               }
569             }
570         }
571     
572         activeRequest->responseHeader(lkey, value);
573     }
574     
575     void processTransferEncoding(const string& te)
576     {
577         if (te == "chunked") {
578             chunkedTransfer = true;
579         } else {
580             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
581             // failure
582         }
583     }
584     
585     void processChunkHeader()
586     {
587         if (buffer.empty()) {
588             // blank line after chunk data
589             return;
590         }
591                 
592         int chunkSize = 0;
593         int semiPos = buffer.find(';');
594         if (semiPos >= 0) {
595             // extensions ignored for the moment
596             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
597         } else {
598             chunkSize = strutils::to_int(buffer, 16);
599         }
600         
601         buffer.clear();
602         if (chunkSize == 0) {  //  trailer start
603             state = STATE_GETTING_TRAILER;
604             return;
605         }
606         
607         state = STATE_GETTING_CHUNKED_BYTES;
608         setByteCount(chunkSize);
609     }
610     
611     void processTrailer()
612     {        
613         if (buffer.empty()) {
614             // end of trailers
615             responseComplete();
616             return;
617         }
618         
619     // process as a normal header
620         processHeader();
621     }
622     
623     void headersComplete()
624     {
625         activeRequest->responseHeadersComplete();
626     }
627     
628     void responseComplete()
629     {
630      //   SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
631         activeRequest->responseComplete();
632         client->requestFinished(this);
633       
634         if (contentDeflate) {
635           inflateEnd(&zlib);
636         }
637       
638         assert(sentRequests.front() == activeRequest);
639         sentRequests.pop_front();
640         bool doClose = activeRequest->closeAfterComplete();
641         activeRequest = NULL;
642       
643         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
644             if (doClose) {
645           // this will bring us into handleClose() above, which updates
646           // state to STATE_CLOSED
647             close();
648               
649           // if we have additional requests waiting, try to start them now
650             tryStartNextRequest();
651           }
652         }
653         
654       if (state != STATE_CLOSED)  {
655         state = STATE_IDLE;
656       }
657       setTerminator("\r\n");
658     }
659     
660     enum ConnectionState {
661         STATE_IDLE = 0,
662         STATE_GETTING_HEADERS,
663         STATE_GETTING_BODY,
664         STATE_GETTING_CHUNKED,
665         STATE_GETTING_CHUNKED_BYTES,
666         STATE_GETTING_TRAILER,
667         STATE_SOCKET_ERROR,
668         STATE_CLOSED             ///< connection should be closed now
669     };
670     
671     Client* client;
672     Request_ptr activeRequest;
673     ConnectionState state;
674     string host;
675     short port;
676     std::string buffer;
677     int bodyTransferSize;
678     SGTimeStamp idleTime;
679     bool chunkedTransfer;
680     bool noMessageBody;
681     int requestBodyBytesToSend;
682   
683     z_stream zlib;
684     unsigned char* zlibInflateBuffer;
685     int zlibInflateBufferSize;
686     unsigned char* zlibOutputBuffer;
687     bool contentGZip, contentDeflate;
688   
689     RequestList queuedRequests;
690     RequestList sentRequests;
691 };
692
693 Client::Client()
694 {
695     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
696 }
697
698 void Client::update(int waitTimeout)
699 {
700     _poller.poll(waitTimeout);
701         
702     ConnectionDict::iterator it = _connections.begin();
703     for (; it != _connections.end(); ) {
704         if (it->second->hasIdleTimeout() || it->second->hasError() ||
705             it->second->hasErrorTimeout())
706         {
707         // connection has been idle for a while, clean it up
708         // (or has an error condition, again clean it up)
709             ConnectionDict::iterator del = it++;
710             delete del->second;
711             _connections.erase(del);
712         } else {
713             if (it->second->shouldStartNext()) {
714                 it->second->tryStartNextRequest();
715             }
716             
717             ++it;
718         }
719     } // of connecion iteration
720 }
721
722 void Client::makeRequest(const Request_ptr& r)
723 {
724     string host = r->host();
725     int port = r->port();
726     if (!_proxy.empty()) {
727         host = _proxy;
728         port = _proxyPort;
729     }
730     
731     stringstream ss;
732     ss << host << "-" << port;
733     string connectionId = ss.str();
734     
735     if (_connections.find(connectionId) == _connections.end()) {
736         Connection* con = new Connection(this);
737         con->setServer(host, port);
738         _poller.addChannel(con);
739         _connections[connectionId] = con;
740     }
741     
742     _connections[connectionId]->queueRequest(r);
743 }
744
745 void Client::requestFinished(Connection* con)
746 {
747     
748 }
749
750 void Client::setUserAgent(const string& ua)
751 {
752     _userAgent = ua;
753 }
754
755 void Client::setProxy(const string& proxy, int port, const string& auth)
756 {
757     _proxy = proxy;
758     _proxyPort = port;
759     _proxyAuth = auth;
760 }
761
762 bool Client::hasActiveRequests() const
763 {
764     ConnectionDict::const_iterator it = _connections.begin();
765     for (; it != _connections.end(); ++it) {
766         if (it->second->isActive()) return true;
767     }
768     
769     return false;
770 }
771
772 } // of namespace HTTP
773
774 } // of namespace simgear