]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
7a32ab4903090c0a128511416dde86b7e7a48ec2
[simgear.git] / simgear / io / HTTPClient.cxx
1 #include "HTTPClient.hxx"
2
3 #include <sstream>
4 #include <cassert>
5 #include <list>
6 #include <iostream>
7 #include <errno.h>
8
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string/case_conv.hpp>
11
12 #include <zlib.h>
13
14 #include <simgear/io/sg_netChat.hxx>
15 #include <simgear/io/lowlevel.hxx>
16 #include <simgear/misc/strutils.hxx>
17 #include <simgear/compiler.h>
18 #include <simgear/debug/logstream.hxx>
19 #include <simgear/timing/timestamp.hxx>
20
21 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
22 #include "version.h"
23 #else
24 #  if !defined(SIMGEAR_VERSION)
25 #    define SIMGEAR_VERSION "simgear-development"
26 #  endif
27 #endif
28
29 using std::string;
30 using std::stringstream;
31 using std::vector;
32
33 namespace simgear
34 {
35
36 namespace HTTP
37 {
38
39 extern const int DEFAULT_HTTP_PORT = 80;
40 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
41 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
42 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
43 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
44   
45 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
46 // detailed description of the logic 
47 const int GZIP_HEADER_ID1 = 31;
48 const int GZIP_HEADER_ID2 = 139;
49 const int GZIP_HEADER_METHOD_DEFLATE = 8;
50 const unsigned int GZIP_HEADER_SIZE = 10;
51 const int GZIP_HEADER_FEXTRA = 1 << 2;
52 const int GZIP_HEADER_FNAME = 1 << 3;
53 const int GZIP_HEADER_COMMENT = 1 << 4;
54 const int GZIP_HEADER_CRC = 1 << 1;
55   
56 class Connection : public NetChat
57 {
58 public:
59     Connection(Client* pr) :
60         client(pr),
61         state(STATE_CLOSED),
62         port(DEFAULT_HTTP_PORT),
63         zlibInflateBuffer(NULL),
64         zlibInflateBufferSize(0),
65         zlibOutputBuffer(NULL)
66     {
67         
68     }
69     
70     virtual ~Connection()
71     {
72       if (zlibInflateBuffer) {
73         free(zlibInflateBuffer);
74       }
75       
76       if (zlibOutputBuffer) {
77         free(zlibOutputBuffer);
78       }
79     }
80   
81     void setServer(const string& h, short p)
82     {
83         host = h;
84         port = p;
85     }
86     
87     // socket-level errors
88     virtual void handleError(int error)
89     {
90         if (error == ENOENT) {
91         // name lookup failure
92             // we won't have an active request yet, so the logic below won't
93             // fire to actually call setFailure. Let's fail all of the requests
94             BOOST_FOREACH(Request_ptr req, sentRequests) {
95                 req->setFailure(error, "hostname lookup failure");
96             }
97             
98             BOOST_FOREACH(Request_ptr req, queuedRequests) {
99                 req->setFailure(error, "hostname lookup failure");
100             }
101             
102         // name lookup failure, abandon all requests on this connection
103             sentRequests.clear();
104             queuedRequests.clear();
105         }
106         
107         NetChat::handleError(error);
108         if (activeRequest) {            
109             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
110             activeRequest->setFailure(error, "socket error");
111             activeRequest = NULL;
112         }
113     
114         state = STATE_SOCKET_ERROR;
115     }
116     
117     virtual void handleClose()
118     {      
119         NetChat::handleClose();
120         
121         if ((state == STATE_GETTING_BODY) && activeRequest) {
122         // force state here, so responseComplete can avoid closing the 
123         // socket again
124             state =  STATE_CLOSED;
125             responseComplete();
126         } else {
127             state = STATE_CLOSED;
128         }
129       
130       if (sentRequests.empty()) {
131         return;
132       }
133       
134     // restore sent requests to the queue, so they will be re-sent
135     // when the connection opens again
136       queuedRequests.insert(queuedRequests.begin(),
137                               sentRequests.begin(), sentRequests.end());
138       sentRequests.clear();
139     }
140     
141     void queueRequest(const Request_ptr& r)
142     {
143       queuedRequests.push_back(r);
144       tryStartNextRequest();
145     }
146     
147     void beginResponse()
148     {
149       assert(!sentRequests.empty());
150       
151       activeRequest = sentRequests.front();      
152       activeRequest->responseStart(buffer);
153       state = STATE_GETTING_HEADERS;
154       buffer.clear();
155       if (activeRequest->responseCode() == 204) {
156         noMessageBody = true;
157       } else if (activeRequest->method() == "HEAD") {
158         noMessageBody = true;
159       } else {
160         noMessageBody = false;
161       }
162
163       bodyTransferSize = -1;
164       chunkedTransfer = false;
165       contentGZip = contentDeflate = false;
166     }
167   
168     void tryStartNextRequest()
169     {
170       if (queuedRequests.empty()) {
171         idleTime.stamp();
172         return;
173       }
174       
175       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
176         return;
177       }
178       
179       if (state == STATE_CLOSED) {
180           if (!connectToHost()) {
181               return;
182           }
183           
184           setTerminator("\r\n");
185           state = STATE_IDLE;
186       }
187      
188       Request_ptr r = queuedRequests.front();
189       requestBodyBytesToSend = r->requestBodyLength();
190           
191       stringstream headerData;
192       string path = r->path();
193       assert(!path.empty());
194       string query = r->query();
195       string bodyData;
196       
197       if (!client->proxyHost().empty()) {
198           path = r->scheme() + "://" + r->host() + r->path();
199       }
200
201       if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
202           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
203           bodyData = query.substr(1); // URL-encode, drop the leading '?'
204           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
205           headerData << "Content-Length:" << bodyData.size() << "\r\n";
206       } else {
207           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
208           if (requestBodyBytesToSend >= 0) {
209             headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
210             headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
211           }
212       }
213       
214       headerData << "Host: " << r->hostAndPort() << "\r\n";
215       headerData << "User-Agent:" << client->userAgent() << "\r\n";
216       headerData << "Accept-Encoding: deflate, gzip\r\n";
217       if (!client->proxyAuth().empty()) {
218           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
219       }
220
221       BOOST_FOREACH(string h, r->requestHeaders()) {
222           headerData << h << ": " << r->header(h) << "\r\n";
223       }
224
225       headerData << "\r\n"; // final CRLF to terminate the headers
226       if (!bodyData.empty()) {
227           headerData << bodyData;
228       }
229       
230       bool ok = push(headerData.str().c_str());
231       if (!ok) {
232           // we've over-stuffed the socket, give up for now, let things
233           // drain down before trying to start any more requests.
234           return;
235       }
236       
237       while (requestBodyBytesToSend > 0) {
238         char buf[4096];
239         int len = 4096;
240         r->getBodyData(buf, len);
241         if (len > 0) {
242           requestBodyBytesToSend -= len;
243           if (!bufferSend(buf, len)) {
244             SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
245             state = STATE_SOCKET_ERROR;
246             return;
247           }
248         } else {
249           SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
250           break;
251         }
252       }
253       
254       //std::cout << "did send request:" << r->url() << std::endl;
255     // successfully sent, remove from queue, and maybe send the next
256       queuedRequests.pop_front();
257       sentRequests.push_back(r);
258       
259     // pipelining, let's maybe send the next request right away
260       tryStartNextRequest();
261     }
262     
263     virtual void collectIncomingData(const char* s, int n)
264     {
265         idleTime.stamp();
266         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
267           if (contentGZip || contentDeflate) {
268             expandCompressedData(s, n);
269           } else {
270             activeRequest->processBodyBytes(s, n);
271           }
272         } else {
273             buffer += string(s, n);
274         }
275     }
276
277     
278     void expandCompressedData(const char* s, int n)
279     {
280       int reqSize = n + zlib.avail_in;
281       if (reqSize > zlibInflateBufferSize) {
282       // reallocate
283         unsigned char* newBuf = (unsigned char*) malloc(reqSize);
284         memcpy(newBuf, zlib.next_in, zlib.avail_in);
285         memcpy(newBuf + zlib.avail_in, s, n);
286         free(zlibInflateBuffer);
287         zlibInflateBuffer = newBuf;
288         zlibInflateBufferSize = reqSize;
289       } else {
290       // important to use memmove here, since it's very likely
291       // the source and destination ranges overlap
292         memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
293         memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
294       }
295             
296       zlib.next_in = (unsigned char*) zlibInflateBuffer;
297       zlib.avail_in = reqSize;
298       zlib.next_out = zlibOutputBuffer;
299       zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
300       
301       if (contentGZip && !handleGZipHeader()) {
302           return;
303       }
304       
305       int writtenSize = 0;
306       do {
307         int result = inflate(&zlib, Z_NO_FLUSH);
308         if (result == Z_OK || result == Z_STREAM_END) {
309             // nothing to do
310         } else {
311           SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
312           return;
313         }
314             
315         writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
316         if (result == Z_STREAM_END) {
317             break;
318         }
319       } while ((writtenSize == 0) && (zlib.avail_in > 0));
320     
321       if (writtenSize > 0) {
322         activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
323       }
324     }
325     
326     bool handleGZipHeader()
327     {
328         // we clear this down to contentDeflate once the GZip header has been seen
329         if (zlib.avail_in < GZIP_HEADER_SIZE) {
330           return false; // need more header bytes
331         }
332         
333         if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
334             (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
335             (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
336         {
337           return false; // invalid GZip header
338         }
339         
340         char flags = zlibInflateBuffer[3];
341         int gzipHeaderSize =  GZIP_HEADER_SIZE;
342         if (flags & GZIP_HEADER_FEXTRA) {
343           gzipHeaderSize += 2;
344           if (zlib.avail_in < gzipHeaderSize) {
345             return false; // need more header bytes
346           }
347           
348           unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
349           if ( sgIsBigEndian() ) {
350               sgEndianSwap( &extraHeaderBytes );
351           }
352           
353           gzipHeaderSize += extraHeaderBytes;
354           if (zlib.avail_in < gzipHeaderSize) {
355             return false; // need more header bytes
356           }
357         }
358         
359         if (flags & GZIP_HEADER_FNAME) {
360           gzipHeaderSize++;
361           while (gzipHeaderSize <= zlib.avail_in) {
362             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
363               break; // found terminating NULL character
364             }
365           }
366         }
367         
368         if (flags & GZIP_HEADER_COMMENT) {
369           gzipHeaderSize++;
370           while (gzipHeaderSize <= zlib.avail_in) {
371             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
372               break; // found terminating NULL character
373             }
374           }
375         }
376         
377         if (flags & GZIP_HEADER_CRC) {
378           gzipHeaderSize += 2;
379         }
380         
381         if (zlib.avail_in < gzipHeaderSize) {
382           return false; // need more header bytes
383         }
384         
385         zlib.next_in += gzipHeaderSize;
386         zlib.avail_in -= gzipHeaderSize;
387       // now we've processed the GZip header, can decode as deflate
388         contentGZip = false;
389         contentDeflate = true;
390         return true;
391     }
392     
393     virtual void foundTerminator(void)
394     {
395         idleTime.stamp();
396         switch (state) {
397         case STATE_IDLE:
398             beginResponse();
399             break;
400             
401         case STATE_GETTING_HEADERS:
402             processHeader();
403             buffer.clear();
404             break;
405             
406         case STATE_GETTING_BODY:
407             responseComplete();
408             break;
409         
410         case STATE_GETTING_CHUNKED:
411             processChunkHeader();
412             break;
413             
414         case STATE_GETTING_CHUNKED_BYTES:
415             setTerminator("\r\n");
416             state = STATE_GETTING_CHUNKED;
417             buffer.clear();
418             break;
419             
420
421         case STATE_GETTING_TRAILER:
422             processTrailer();
423             buffer.clear();
424             break;
425         
426         default:
427             break;
428         }
429     }
430     
431     bool hasIdleTimeout() const
432     {
433         if (state != STATE_IDLE) {
434             return false;
435         }
436         
437         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
438     }
439   
440     bool hasErrorTimeout() const
441     {
442       if (state == STATE_IDLE) {
443         return false;
444       }
445       
446       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
447     }
448     
449     bool hasError() const
450     {
451         return (state == STATE_SOCKET_ERROR);
452     }
453     
454     bool shouldStartNext() const
455     {
456       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
457     }
458     
459     bool isActive() const
460     {
461         return !queuedRequests.empty() || !sentRequests.empty();
462     }
463 private:
464     bool connectToHost()
465     {
466         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
467         
468         if (!open()) {
469             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
470             return false;
471         }
472         
473         if (connect(host.c_str(), port) != 0) {
474             return false;
475         }
476         
477         return true;
478     }
479     
480     
481     void processHeader()
482     {
483         string h = strutils::simplify(buffer);
484         if (h.empty()) { // blank line terminates headers
485             headersComplete();
486             
487             if (contentGZip || contentDeflate) {
488                 memset(&zlib, 0, sizeof(z_stream));
489                 if (!zlibOutputBuffer) {
490                     zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
491                 }
492             
493                 // NULLs means we'll get default alloc+free methods
494                 // which is absolutely fine
495                 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
496                 zlib.next_out = zlibOutputBuffer;
497                 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
498                   SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
499                 }
500             }
501           
502             if (chunkedTransfer) {
503                 state = STATE_GETTING_CHUNKED;
504             } else if (noMessageBody || (bodyTransferSize == 0)) {
505                 // force the state to GETTING_BODY, to simplify logic in
506                 // responseComplete and handleClose
507                 state = STATE_GETTING_BODY;
508                 responseComplete();
509             } else {
510                 setByteCount(bodyTransferSize); // may be -1, that's fine              
511                 state = STATE_GETTING_BODY;
512             }
513             
514             return;
515         }
516               
517         int colonPos = buffer.find(':');
518         if (colonPos < 0) {
519             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
520             return;
521         }
522         
523         string key = strutils::simplify(buffer.substr(0, colonPos));
524         string lkey = boost::to_lower_copy(key);
525         string value = strutils::strip(buffer.substr(colonPos + 1));
526         
527         // only consider these if getting headers (as opposed to trailers 
528         // of a chunked transfer)
529         if (state == STATE_GETTING_HEADERS) {
530             if (lkey == "content-length") {
531
532                 int sz = strutils::to_int(value);
533                 if (bodyTransferSize <= 0) {
534                     bodyTransferSize = sz;
535                 }
536                 activeRequest->setResponseLength(sz);
537             } else if (lkey == "transfer-length") {
538                 bodyTransferSize = strutils::to_int(value);
539             } else if (lkey == "transfer-encoding") {
540                 processTransferEncoding(value);
541             } else if (lkey == "content-encoding") {
542               if (value == "gzip") {
543                 contentGZip = true;
544               } else if (value == "deflate") {
545                 contentDeflate = true;
546               } else if (value != "identity") {
547                 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
548               }
549             }
550         }
551     
552         activeRequest->responseHeader(lkey, value);
553     }
554     
555     void processTransferEncoding(const string& te)
556     {
557         if (te == "chunked") {
558             chunkedTransfer = true;
559         } else {
560             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
561             // failure
562         }
563     }
564     
565     void processChunkHeader()
566     {
567         if (buffer.empty()) {
568             // blank line after chunk data
569             return;
570         }
571                 
572         int chunkSize = 0;
573         int semiPos = buffer.find(';');
574         if (semiPos >= 0) {
575             // extensions ignored for the moment
576             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
577         } else {
578             chunkSize = strutils::to_int(buffer, 16);
579         }
580         
581         buffer.clear();
582         if (chunkSize == 0) {  //  trailer start
583             state = STATE_GETTING_TRAILER;
584             return;
585         }
586         
587         state = STATE_GETTING_CHUNKED_BYTES;
588         setByteCount(chunkSize);
589     }
590     
591     void processTrailer()
592     {        
593         if (buffer.empty()) {
594             // end of trailers
595             responseComplete();
596             return;
597         }
598         
599     // process as a normal header
600         processHeader();
601     }
602     
603     void headersComplete()
604     {
605         activeRequest->responseHeadersComplete();
606     }
607     
608     void responseComplete()
609     {
610       //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
611         activeRequest->responseComplete();
612         client->requestFinished(this);
613       
614         if (contentDeflate) {
615           inflateEnd(&zlib);
616         }
617       
618         assert(sentRequests.front() == activeRequest);
619         sentRequests.pop_front();
620         bool doClose = activeRequest->closeAfterComplete();
621         activeRequest = NULL;
622       
623         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
624             if (doClose) {
625           // this will bring us into handleClose() above, which updates
626           // state to STATE_CLOSED
627             close();
628               
629           // if we have additional requests waiting, try to start them now
630             tryStartNextRequest();
631           }
632         }
633         
634       if (state != STATE_CLOSED)  {
635         state = STATE_IDLE;
636       }
637       setTerminator("\r\n");
638     }
639     
640     enum ConnectionState {
641         STATE_IDLE = 0,
642         STATE_GETTING_HEADERS,
643         STATE_GETTING_BODY,
644         STATE_GETTING_CHUNKED,
645         STATE_GETTING_CHUNKED_BYTES,
646         STATE_GETTING_TRAILER,
647         STATE_SOCKET_ERROR,
648         STATE_CLOSED             ///< connection should be closed now
649     };
650     
651     Client* client;
652     Request_ptr activeRequest;
653     ConnectionState state;
654     string host;
655     short port;
656     std::string buffer;
657     int bodyTransferSize;
658     SGTimeStamp idleTime;
659     bool chunkedTransfer;
660     bool noMessageBody;
661     int requestBodyBytesToSend;
662   
663     z_stream zlib;
664     unsigned char* zlibInflateBuffer;
665     int zlibInflateBufferSize;
666     unsigned char* zlibOutputBuffer;
667     bool contentGZip, contentDeflate;
668   
669     std::list<Request_ptr> queuedRequests;
670     std::list<Request_ptr> sentRequests;
671 };
672
673 Client::Client()
674 {
675     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
676 }
677
678 void Client::update(int waitTimeout)
679 {
680     NetChannel::poll(waitTimeout);
681         
682     ConnectionDict::iterator it = _connections.begin();
683     for (; it != _connections.end(); ) {
684         if (it->second->hasIdleTimeout() || it->second->hasError() ||
685             it->second->hasErrorTimeout())
686         {
687         // connection has been idle for a while, clean it up
688         // (or has an error condition, again clean it up)
689             ConnectionDict::iterator del = it++;
690             delete del->second;
691             _connections.erase(del);
692         } else {
693             if (it->second->shouldStartNext()) {
694                 it->second->tryStartNextRequest();
695             }
696             
697             ++it;
698         }
699     } // of connecion iteration
700 }
701
702 void Client::makeRequest(const Request_ptr& r)
703 {
704     string host = r->host();
705     int port = r->port();
706     if (!_proxy.empty()) {
707         host = _proxy;
708         port = _proxyPort;
709     }
710     
711     stringstream ss;
712     ss << host << "-" << port;
713     string connectionId = ss.str();
714     
715     if (_connections.find(connectionId) == _connections.end()) {
716         Connection* con = new Connection(this);
717         con->setServer(host, port);
718         _connections[connectionId] = con;
719     }
720     
721     _connections[connectionId]->queueRequest(r);
722 }
723
724 void Client::requestFinished(Connection* con)
725 {
726     
727 }
728
729 void Client::setUserAgent(const string& ua)
730 {
731     _userAgent = ua;
732 }
733
734 void Client::setProxy(const string& proxy, int port, const string& auth)
735 {
736     _proxy = proxy;
737     _proxyPort = port;
738     _proxyAuth = auth;
739 }
740
741 bool Client::hasActiveRequests() const
742 {
743     ConnectionDict::const_iterator it = _connections.begin();
744     for (; it != _connections.end(); ++it) {
745         if (it->second->isActive()) return true;
746     }
747     
748     return false;
749 }
750
751 } // of namespace HTTP
752
753 } // of namespace simgear