]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
71f03e372405a70bcab1a92c1f47f4b4e89b7255
[simgear.git] / simgear / io / HTTPClient.cxx
1 #include "HTTPClient.hxx"
2
3 #include <sstream>
4 #include <cassert>
5 #include <list>
6 #include <iostream>
7 #include <errno.h>
8
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string/case_conv.hpp>
11
12 #include <zlib.h>
13
14 #include <simgear/io/sg_netChat.hxx>
15 #include <simgear/io/lowlevel.hxx>
16 #include <simgear/misc/strutils.hxx>
17 #include <simgear/compiler.h>
18 #include <simgear/debug/logstream.hxx>
19 #include <simgear/timing/timestamp.hxx>
20
21 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
22 #include "version.h"
23 #else
24 #  if !defined(SIMGEAR_VERSION)
25 #    define SIMGEAR_VERSION "simgear-development"
26 #  endif
27 #endif
28
29 using std::string;
30 using std::stringstream;
31 using std::vector;
32
33 namespace simgear
34 {
35
36 namespace HTTP
37 {
38
39 extern const int DEFAULT_HTTP_PORT = 80;
40 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
41 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
42 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
43 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
44   
45 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
46 // detailed description of the logic 
47 const int GZIP_HEADER_ID1 = 31;
48 const int GZIP_HEADER_ID2 = 139;
49 const int GZIP_HEADER_METHOD_DEFLATE = 8;
50 const int GZIP_HEADER_SIZE = 10;
51 const int GZIP_HEADER_FEXTRA = 1 << 2;
52 const int GZIP_HEADER_FNAME = 1 << 3;
53 const int GZIP_HEADER_COMMENT = 1 << 4;
54 const int GZIP_HEADER_CRC = 1 << 1;
55   
56 class Connection : public NetChat
57 {
58 public:
59     Connection(Client* pr) :
60         client(pr),
61         state(STATE_CLOSED),
62         port(DEFAULT_HTTP_PORT),
63         zlibInflateBuffer(NULL),
64         zlibInflateBufferSize(0),
65         zlibOutputBuffer(NULL)
66     {
67         
68     }
69     
70     virtual ~Connection()
71     {
72       if (zlibInflateBuffer) {
73         free(zlibInflateBuffer);
74       }
75       
76       if (zlibOutputBuffer) {
77         free(zlibOutputBuffer);
78       }
79     }
80   
81     void setServer(const string& h, short p)
82     {
83         host = h;
84         port = p;
85     }
86     
87     // socket-level errors
88     virtual void handleError(int error)
89     {
90         if (error == ENOENT) {
91         // name lookup failure
92             // we won't have an active request yet, so the logic below won't
93             // fire to actually call setFailure. Let's fail all of the requests
94             BOOST_FOREACH(Request_ptr req, sentRequests) {
95                 req->setFailure(error, "hostname lookup failure");
96             }
97             
98             BOOST_FOREACH(Request_ptr req, queuedRequests) {
99                 req->setFailure(error, "hostname lookup failure");
100             }
101             
102         // name lookup failure, abandon all requests on this connection
103             sentRequests.clear();
104             queuedRequests.clear();
105         }
106         
107         NetChat::handleError(error);
108         if (activeRequest) {            
109             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
110             activeRequest->setFailure(error, "socket error");
111             activeRequest = NULL;
112         }
113     
114         state = STATE_SOCKET_ERROR;
115     }
116     
117     virtual void handleClose()
118     {      
119         NetChat::handleClose();
120         
121         if ((state == STATE_GETTING_BODY) && activeRequest) {
122         // force state here, so responseComplete can avoid closing the 
123         // socket again
124             state =  STATE_CLOSED;
125             responseComplete();
126         } else {
127             state = STATE_CLOSED;
128         }
129       
130       if (sentRequests.empty()) {
131         return;
132       }
133       
134     // restore sent requests to the queue, so they will be re-sent
135     // when the connection opens again
136       queuedRequests.insert(queuedRequests.begin(),
137                               sentRequests.begin(), sentRequests.end());
138       sentRequests.clear();
139     }
140     
141     void queueRequest(const Request_ptr& r)
142     {
143       queuedRequests.push_back(r);
144       tryStartNextRequest();
145     }
146     
147     void beginResponse()
148     {
149       assert(!sentRequests.empty());
150       
151       activeRequest = sentRequests.front();      
152       activeRequest->responseStart(buffer);
153       state = STATE_GETTING_HEADERS;
154       buffer.clear();
155       if (activeRequest->responseCode() == 204) {
156         noMessageBody = true;
157       } else if (activeRequest->method() == "HEAD") {
158         noMessageBody = true;
159       } else {
160         noMessageBody = false;
161       }
162
163       bodyTransferSize = -1;
164       chunkedTransfer = false;
165       contentGZip = contentDeflate = false;
166     }
167   
168     void tryStartNextRequest()
169     {
170       if (queuedRequests.empty()) {
171         idleTime.stamp();
172         return;
173       }
174       
175       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
176         return;
177       }
178       
179       if (state == STATE_CLOSED) {
180           if (!connectToHost()) {
181               return;
182           }
183           
184           setTerminator("\r\n");
185           state = STATE_IDLE;
186       }
187      
188       Request_ptr r = queuedRequests.front();
189       requestBodyBytesToSend = r->requestBodyLength();
190           
191       stringstream headerData;
192       string path = r->path();
193       assert(!path.empty());
194       string query = r->query();
195       string bodyData;
196       
197       if (!client->proxyHost().empty()) {
198           path = r->scheme() + "://" + r->host() + r->path();
199       }
200
201       if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
202           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
203           bodyData = query.substr(1); // URL-encode, drop the leading '?'
204           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
205           headerData << "Content-Length:" << bodyData.size() << "\r\n";
206       } else {
207           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
208           if (requestBodyBytesToSend >= 0) {
209             headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
210             headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
211           }
212       }
213       
214       headerData << "Host: " << r->hostAndPort() << "\r\n";
215       headerData << "User-Agent:" << client->userAgent() << "\r\n";
216       headerData << "Accept-Encoding: deflate, gzip\r\n";
217       if (!client->proxyAuth().empty()) {
218           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
219       }
220
221       BOOST_FOREACH(string h, r->requestHeaders()) {
222           headerData << h << ": " << r->header(h) << "\r\n";
223       }
224
225       headerData << "\r\n"; // final CRLF to terminate the headers
226       if (!bodyData.empty()) {
227           headerData << bodyData;
228       }
229       
230       bool ok = push(headerData.str().c_str());
231       if (!ok) {
232           // we've over-stuffed the socket, give up for now, let things
233           // drain down before trying to start any more requests.
234           return;
235       }
236       
237       while (requestBodyBytesToSend > 0) {
238         char buf[4096];
239         int len = 4096;
240         r->getBodyData(buf, len);
241         if (len > 0) {
242           requestBodyBytesToSend -= len;
243           if (!bufferSend(buf, len)) {
244             SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
245             state = STATE_SOCKET_ERROR;
246             return;
247           }
248         } else {
249           SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
250           break;
251         }
252       }
253       
254       //std::cout << "did send request:" << r->url() << std::endl;
255     // successfully sent, remove from queue, and maybe send the next
256       queuedRequests.pop_front();
257       sentRequests.push_back(r);
258       
259     // pipelining, let's maybe send the next request right away
260       tryStartNextRequest();
261     }
262     
263     virtual void collectIncomingData(const char* s, int n)
264     {
265         idleTime.stamp();
266         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
267           if (contentGZip || contentDeflate) {
268             expandCompressedData(s, n);
269           } else {
270             activeRequest->processBodyBytes(s, n);
271           }
272         } else {
273             buffer += string(s, n);
274         }
275     }
276
277     
278     void expandCompressedData(const char* s, int n)
279     {
280       int reqSize = n + zlib.avail_in;
281       if (reqSize > zlibInflateBufferSize) {
282       // reallocate
283         unsigned char* newBuf = (unsigned char*) malloc(reqSize);
284         memcpy(newBuf, zlib.next_in, zlib.avail_in);
285         memcpy(newBuf + zlib.avail_in, s, n);
286         free(zlibInflateBuffer);
287         zlibInflateBuffer = newBuf;
288         zlibInflateBufferSize = reqSize;
289       } else {
290       // important to use memmove here, since it's very likely
291       // the source and destination ranges overlap
292         memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
293         memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
294       }
295             
296       zlib.next_in = (unsigned char*) zlibInflateBuffer;
297       zlib.avail_in = reqSize;
298       zlib.next_out = zlibOutputBuffer;
299       zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
300       
301       if (contentGZip) {
302         // we clear this down to contentDeflate once the GZip header has been seen
303         if (reqSize < GZIP_HEADER_SIZE) {
304           return; // need more header bytes
305         }
306         
307         if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
308             (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
309             (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
310         {
311           return; // invalid GZip header
312         }
313         
314         char flags = zlibInflateBuffer[3];
315         int gzipHeaderSize =  GZIP_HEADER_SIZE;
316         if (flags & GZIP_HEADER_FEXTRA) {
317           gzipHeaderSize += 2;
318           if (reqSize < gzipHeaderSize) {
319             return; // need more header bytes
320           }
321           
322           unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
323           if ( sgIsBigEndian() ) {
324               sgEndianSwap( &extraHeaderBytes );
325           }
326           
327           gzipHeaderSize += extraHeaderBytes;
328           if (reqSize < gzipHeaderSize) {
329             return; // need more header bytes
330           }
331         }
332         
333         if (flags & GZIP_HEADER_FNAME) {
334           gzipHeaderSize++;
335           while (gzipHeaderSize <= reqSize) {
336             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
337               break; // found terminating NULL character
338             }
339           }
340         }
341         
342         if (flags & GZIP_HEADER_COMMENT) {
343           gzipHeaderSize++;
344           while (gzipHeaderSize <= reqSize) {
345             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
346               break; // found terminating NULL character
347             }
348           }
349         }
350         
351         if (flags & GZIP_HEADER_CRC) {
352           gzipHeaderSize += 2;
353         }
354         
355         if (reqSize < gzipHeaderSize) {
356           return; // need more header bytes
357         }
358         
359         zlib.next_in += gzipHeaderSize;
360         zlib.avail_in = reqSize - gzipHeaderSize;
361       // now we've processed the GZip header, can decode as deflate
362         contentGZip = false;
363         contentDeflate = true;
364       }
365       
366       int writtenSize = 0;
367       do {
368         int result = inflate(&zlib, Z_NO_FLUSH);
369         if (result == Z_OK || result == Z_STREAM_END) {
370               
371         } else {
372           SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
373           return;
374         }
375             
376         writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
377       } while ((writtenSize == 0) && (zlib.avail_in > 0));
378     
379       if (writtenSize > 0) {
380         activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
381       }
382     }
383     
384     virtual void foundTerminator(void)
385     {
386         idleTime.stamp();
387         switch (state) {
388         case STATE_IDLE:
389             beginResponse();
390             break;
391             
392         case STATE_GETTING_HEADERS:
393             processHeader();
394             buffer.clear();
395             break;
396             
397         case STATE_GETTING_BODY:
398             responseComplete();
399             break;
400         
401         case STATE_GETTING_CHUNKED:
402             processChunkHeader();
403             break;
404             
405         case STATE_GETTING_CHUNKED_BYTES:
406             setTerminator("\r\n");
407             state = STATE_GETTING_CHUNKED;
408             break;
409             
410
411         case STATE_GETTING_TRAILER:
412             processTrailer();
413             buffer.clear();
414             break;
415         
416         default:
417             break;
418         }
419     }
420     
421     bool hasIdleTimeout() const
422     {
423         if (state != STATE_IDLE) {
424             return false;
425         }
426         
427         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
428     }
429   
430     bool hasErrorTimeout() const
431     {
432       if (state == STATE_IDLE) {
433         return false;
434       }
435       
436       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
437     }
438     
439     bool hasError() const
440     {
441         return (state == STATE_SOCKET_ERROR);
442     }
443     
444     bool shouldStartNext() const
445     {
446       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
447     }
448 private:
449     bool connectToHost()
450     {
451         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
452         
453         if (!open()) {
454             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
455             return false;
456         }
457         
458         if (connect(host.c_str(), port) != 0) {
459             return false;
460         }
461         
462         return true;
463     }
464     
465     
466     void processHeader()
467     {
468         string h = strutils::simplify(buffer);
469         if (h.empty()) { // blank line terminates headers
470             headersComplete();
471             
472             if (contentGZip || contentDeflate) {
473                 memset(&zlib, 0, sizeof(z_stream));
474                 if (!zlibOutputBuffer) {
475                     zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
476                 }
477             
478                 // NULLs means we'll get default alloc+free methods
479                 // which is absolutely fine
480                 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
481                 zlib.next_out = zlibOutputBuffer;
482                 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
483                   SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
484                 }
485             }
486           
487             if (chunkedTransfer) {
488                 state = STATE_GETTING_CHUNKED;
489             } else if (noMessageBody || (bodyTransferSize == 0)) {
490                 // force the state to GETTING_BODY, to simplify logic in
491                 // responseComplete and handleClose
492                 state = STATE_GETTING_BODY;
493                 responseComplete();
494             } else {
495                 setByteCount(bodyTransferSize); // may be -1, that's fine              
496                 state = STATE_GETTING_BODY;
497             }
498             
499             return;
500         }
501               
502         int colonPos = buffer.find(':');
503         if (colonPos < 0) {
504             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
505             return;
506         }
507         
508         string key = strutils::simplify(buffer.substr(0, colonPos));
509         string lkey = boost::to_lower_copy(key);
510         string value = strutils::strip(buffer.substr(colonPos + 1));
511         
512         // only consider these if getting headers (as opposed to trailers 
513         // of a chunked transfer)
514         if (state == STATE_GETTING_HEADERS) {
515             if (lkey == "content-length") {
516
517                 int sz = strutils::to_int(value);
518                 if (bodyTransferSize <= 0) {
519                     bodyTransferSize = sz;
520                 }
521                 activeRequest->setResponseLength(sz);
522             } else if (lkey == "transfer-length") {
523                 bodyTransferSize = strutils::to_int(value);
524             } else if (lkey == "transfer-encoding") {
525                 processTransferEncoding(value);
526             } else if (lkey == "content-encoding") {
527               if (value == "gzip") {
528                 contentGZip = true;
529               } else if (value == "deflate") {
530                 contentDeflate = true;
531               } else if (value != "identity") {
532                 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
533               }
534             }
535         }
536     
537         activeRequest->responseHeader(lkey, value);
538     }
539     
540     void processTransferEncoding(const string& te)
541     {
542         if (te == "chunked") {
543             chunkedTransfer = true;
544         } else {
545             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
546             // failure
547         }
548     }
549     
550     void processChunkHeader()
551     {
552         if (buffer.empty()) {
553             // blank line after chunk data
554             return;
555         }
556         
557         int chunkSize = 0;
558         int semiPos = buffer.find(';');
559         if (semiPos >= 0) {
560             // extensions ignored for the moment
561             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
562         } else {
563             chunkSize = strutils::to_int(buffer, 16);
564         }
565         
566         buffer.clear();
567         if (chunkSize == 0) {  //  trailer start
568             state = STATE_GETTING_TRAILER;
569             return;
570         }
571         
572         state = STATE_GETTING_CHUNKED_BYTES;
573         setByteCount(chunkSize);
574     }
575     
576     void processTrailer()
577     {        
578         if (buffer.empty()) {
579             // end of trailers
580             responseComplete();
581             return;
582         }
583         
584     // process as a normal header
585         processHeader();
586     }
587     
588     void headersComplete()
589     {
590         activeRequest->responseHeadersComplete();
591     }
592     
593     void responseComplete()
594     {
595       //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
596         activeRequest->responseComplete();
597         client->requestFinished(this);
598       
599         if (contentDeflate) {
600           inflateEnd(&zlib);
601         }
602       
603         assert(sentRequests.front() == activeRequest);
604         sentRequests.pop_front();
605         bool doClose = activeRequest->closeAfterComplete();
606         activeRequest = NULL;
607       
608         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
609             if (doClose) {
610           // this will bring us into handleClose() above, which updates
611           // state to STATE_CLOSED
612             close();
613               
614           // if we have additional requests waiting, try to start them now
615             tryStartNextRequest();
616           }
617         }
618         
619       if (state != STATE_CLOSED)  {
620         state = STATE_IDLE;
621       }
622       setTerminator("\r\n");
623     }
624     
625     enum ConnectionState {
626         STATE_IDLE = 0,
627         STATE_GETTING_HEADERS,
628         STATE_GETTING_BODY,
629         STATE_GETTING_CHUNKED,
630         STATE_GETTING_CHUNKED_BYTES,
631         STATE_GETTING_TRAILER,
632         STATE_SOCKET_ERROR,
633         STATE_CLOSED             ///< connection should be closed now
634     };
635     
636     Client* client;
637     Request_ptr activeRequest;
638     ConnectionState state;
639     string host;
640     short port;
641     std::string buffer;
642     int bodyTransferSize;
643     SGTimeStamp idleTime;
644     bool chunkedTransfer;
645     bool noMessageBody;
646     int requestBodyBytesToSend;
647   
648     z_stream zlib;
649     unsigned char* zlibInflateBuffer;
650     int zlibInflateBufferSize;
651     unsigned char* zlibOutputBuffer;
652     bool contentGZip, contentDeflate;
653   
654     std::list<Request_ptr> queuedRequests;
655     std::list<Request_ptr> sentRequests;
656 };
657
658 Client::Client()
659 {
660     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
661 }
662
663 void Client::update(int waitTimeout)
664 {
665     NetChannel::poll(waitTimeout);
666         
667     ConnectionDict::iterator it = _connections.begin();
668     for (; it != _connections.end(); ) {
669         if (it->second->hasIdleTimeout() || it->second->hasError() ||
670             it->second->hasErrorTimeout())
671         {
672         // connection has been idle for a while, clean it up
673         // (or has an error condition, again clean it up)
674             ConnectionDict::iterator del = it++;
675             delete del->second;
676             _connections.erase(del);
677         } else {
678             if (it->second->shouldStartNext()) {
679                 it->second->tryStartNextRequest();
680             }
681             
682             ++it;
683         }
684     } // of connecion iteration
685 }
686
687 void Client::makeRequest(const Request_ptr& r)
688 {
689     string host = r->host();
690     int port = r->port();
691     if (!_proxy.empty()) {
692         host = _proxy;
693         port = _proxyPort;
694     }
695     
696     stringstream ss;
697     ss << host << "-" << port;
698     string connectionId = ss.str();
699     
700     if (_connections.find(connectionId) == _connections.end()) {
701         Connection* con = new Connection(this);
702         con->setServer(host, port);
703         _connections[connectionId] = con;
704     }
705     
706     _connections[connectionId]->queueRequest(r);
707 }
708
709 void Client::requestFinished(Connection* con)
710 {
711     
712 }
713
714 void Client::setUserAgent(const string& ua)
715 {
716     _userAgent = ua;
717 }
718
719 void Client::setProxy(const string& proxy, int port, const string& auth)
720 {
721     _proxy = proxy;
722     _proxyPort = port;
723     _proxyAuth = auth;
724 }
725
726 } // of namespace HTTP
727
728 } // of namespace simgear