]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
hla: Use HLADataElementIndices for HLAInteractionClass.
[simgear.git] / simgear / io / HTTPClient.cxx
1 #include "HTTPClient.hxx"
2
3 #include <sstream>
4 #include <cassert>
5 #include <list>
6 #include <iostream>
7
8 #include <boost/foreach.hpp>
9 #include <boost/algorithm/string/case_conv.hpp>
10
11 #include <zlib.h>
12
13 #include <simgear/io/sg_netChat.hxx>
14 #include <simgear/io/lowlevel.hxx>
15 #include <simgear/misc/strutils.hxx>
16 #include <simgear/compiler.h>
17 #include <simgear/debug/logstream.hxx>
18 #include <simgear/timing/timestamp.hxx>
19
20 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
21 #include "version.h"
22 #else
23 #  if !defined(SIMGEAR_VERSION)
24 #    define SIMGEAR_VERSION "simgear-development"
25 #  endif
26 #endif
27
28 using std::string;
29 using std::stringstream;
30 using std::vector;
31
32 namespace simgear
33 {
34
35 namespace HTTP
36 {
37
38 extern const int DEFAULT_HTTP_PORT = 80;
39 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
40 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
41 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
42 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
43   
44 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
45 // detailed description of the logic 
46 const int GZIP_HEADER_ID1 = 31;
47 const int GZIP_HEADER_ID2 = 139;
48 const int GZIP_HEADER_METHOD_DEFLATE = 8;
49 const int GZIP_HEADER_SIZE = 10;
50 const int GZIP_HEADER_FEXTRA = 1 << 2;
51 const int GZIP_HEADER_FNAME = 1 << 3;
52 const int GZIP_HEADER_COMMENT = 1 << 4;
53 const int GZIP_HEADER_CRC = 1 << 1;
54   
55 class Connection : public NetChat
56 {
57 public:
58     Connection(Client* pr) :
59         client(pr),
60         state(STATE_CLOSED),
61         port(DEFAULT_HTTP_PORT),
62         zlibInflateBuffer(NULL),
63         zlibInflateBufferSize(0),
64         zlibOutputBuffer(NULL)
65     {
66         
67     }
68     
69     virtual ~Connection()
70     {
71       if (zlibInflateBuffer) {
72         free(zlibInflateBuffer);
73       }
74       
75       if (zlibOutputBuffer) {
76         free(zlibOutputBuffer);
77       }
78     }
79   
80     void setServer(const string& h, short p)
81     {
82         host = h;
83         port = p;
84     }
85     
86     // socket-level errors
87     virtual void handleError(int error)
88     {        
89         NetChat::handleError(error);
90         if (activeRequest) {
91             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
92             activeRequest->setFailure(error, "socket error");
93             activeRequest = NULL;
94         }
95     
96         state = STATE_SOCKET_ERROR;
97     }
98     
99     virtual void handleClose()
100     {      
101         NetChat::handleClose();
102         
103         if ((state == STATE_GETTING_BODY) && activeRequest) {
104         // force state here, so responseComplete can avoid closing the 
105         // socket again
106             state =  STATE_CLOSED;
107             responseComplete();
108         } else {
109             state = STATE_CLOSED;
110         }
111       
112       if (sentRequests.empty()) {
113         return;
114       }
115       
116     // restore sent requests to the queue, so they will be re-sent
117     // when the connection opens again
118       queuedRequests.insert(queuedRequests.begin(),
119                               sentRequests.begin(), sentRequests.end());
120       sentRequests.clear();
121     }
122     
123     void queueRequest(const Request_ptr& r)
124     {
125       queuedRequests.push_back(r);
126       tryStartNextRequest();
127     }
128     
129     void beginResponse()
130     {
131       assert(!sentRequests.empty());
132       
133       activeRequest = sentRequests.front();
134       activeRequest->responseStart(buffer);
135       state = STATE_GETTING_HEADERS;
136       buffer.clear();
137       if (activeRequest->responseCode() == 204) {
138         noMessageBody = true;
139       } else if (activeRequest->method() == "HEAD") {
140         noMessageBody = true;
141       } else {
142         noMessageBody = false;
143       }
144
145       bodyTransferSize = -1;
146       chunkedTransfer = false;
147       contentGZip = contentDeflate = false;
148     }
149   
150     void tryStartNextRequest()
151     {
152       if (queuedRequests.empty()) {
153         idleTime.stamp();
154         return;
155       }
156       
157       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
158         return;
159       }
160       
161       if (state == STATE_CLOSED) {
162           if (!connectToHost()) {
163               return;
164           }
165           
166           setTerminator("\r\n");
167           state = STATE_IDLE;
168       }
169      
170       Request_ptr r = queuedRequests.front();
171       requestBodyBytesToSend = r->requestBodyLength();
172     
173       stringstream headerData;
174       string path = r->path();
175       string query = r->query();
176       string bodyData;
177       
178       if (!client->proxyHost().empty()) {
179           path = r->scheme() + "://" + r->host() + r->path();
180       }
181
182       if (r->method() == "POST") {
183           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
184           bodyData = query.substr(1); // URL-encode, drop the leading '?'
185           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
186           headerData << "Content-Length:" << bodyData.size() << "\r\n";
187       } else {
188           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
189           if (requestBodyBytesToSend >= 0) {
190             headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
191             headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
192           }
193       }
194       
195       headerData << "Host: " << r->hostAndPort() << "\r\n";
196       headerData << "User-Agent:" << client->userAgent() << "\r\n";
197       headerData << "Accept-Encoding: deflate, gzip\r\n";
198       if (!client->proxyAuth().empty()) {
199           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
200       }
201
202       BOOST_FOREACH(string h, r->requestHeaders()) {
203           headerData << h << ": " << r->header(h) << "\r\n";
204       }
205
206       headerData << "\r\n"; // final CRLF to terminate the headers
207       if (!bodyData.empty()) {
208           headerData << bodyData;
209       }
210       
211       bool ok = push(headerData.str().c_str());
212       if (!ok) {
213           // we've over-stuffed the socket, give up for now, let things
214           // drain down before trying to start any more requests.
215           return;
216       }
217       
218       while (requestBodyBytesToSend > 0) {
219         char buf[4096];
220         int len = 4096;
221         r->getBodyData(buf, len);
222         if (len > 0) {
223           requestBodyBytesToSend -= len;
224           if (!bufferSend(buf, len)) {
225             SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
226             state = STATE_SOCKET_ERROR;
227             return;
228           }
229         } else {
230           SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
231           break;
232         }
233       }
234       
235       //std::cout << "did send request:" << r->url() << std::endl;
236     // successfully sent, remove from queue, and maybe send the next
237       queuedRequests.pop_front();
238       sentRequests.push_back(r);
239       
240     // pipelining, let's maybe send the next request right away
241       tryStartNextRequest();
242     }
243     
244     virtual void collectIncomingData(const char* s, int n)
245     {
246         idleTime.stamp();
247         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
248           if (contentGZip || contentDeflate) {
249             expandCompressedData(s, n);
250           } else {
251             activeRequest->processBodyBytes(s, n);
252           }
253         } else {
254             buffer += string(s, n);
255         }
256     }
257
258     
259     void expandCompressedData(const char* s, int n)
260     {
261       int reqSize = n + zlib.avail_in;
262       if (reqSize > zlibInflateBufferSize) {
263       // reallocate
264         unsigned char* newBuf = (unsigned char*) malloc(reqSize);
265         memcpy(newBuf, zlib.next_in, zlib.avail_in);
266         memcpy(newBuf + zlib.avail_in, s, n);
267         free(zlibInflateBuffer);
268         zlibInflateBuffer = newBuf;
269         zlibInflateBufferSize = reqSize;
270       } else {
271       // important to use memmove here, since it's very likely
272       // the source and destination ranges overlap
273         memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
274         memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
275       }
276             
277       zlib.next_in = (unsigned char*) zlibInflateBuffer;
278       zlib.avail_in = reqSize;
279       zlib.next_out = zlibOutputBuffer;
280       zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
281       
282       if (contentGZip) {
283         // we clear this down to contentDeflate once the GZip header has been seen
284         if (reqSize < GZIP_HEADER_SIZE) {
285           return; // need more header bytes
286         }
287         
288         if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
289             (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
290             (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
291         {
292           return; // invalid GZip header
293         }
294         
295         char flags = zlibInflateBuffer[3];
296         int gzipHeaderSize =  GZIP_HEADER_SIZE;
297         if (flags & GZIP_HEADER_FEXTRA) {
298           gzipHeaderSize += 2;
299           if (reqSize < gzipHeaderSize) {
300             return; // need more header bytes
301           }
302           
303           unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
304           if ( sgIsBigEndian() ) {
305               sgEndianSwap( &extraHeaderBytes );
306           }
307           
308           gzipHeaderSize += extraHeaderBytes;
309           if (reqSize < gzipHeaderSize) {
310             return; // need more header bytes
311           }
312         }
313         
314         if (flags & GZIP_HEADER_FNAME) {
315           gzipHeaderSize++;
316           while (gzipHeaderSize <= reqSize) {
317             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
318               break; // found terminating NULL character
319             }
320           }
321         }
322         
323         if (flags & GZIP_HEADER_COMMENT) {
324           gzipHeaderSize++;
325           while (gzipHeaderSize <= reqSize) {
326             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
327               break; // found terminating NULL character
328             }
329           }
330         }
331         
332         if (flags & GZIP_HEADER_CRC) {
333           gzipHeaderSize += 2;
334         }
335         
336         if (reqSize < gzipHeaderSize) {
337           return; // need more header bytes
338         }
339         
340         zlib.next_in += gzipHeaderSize;
341         zlib.avail_in = reqSize - gzipHeaderSize;
342       // now we've processed the GZip header, can decode as deflate
343         contentGZip = false;
344         contentDeflate = true;
345       }
346       
347       int writtenSize = 0;
348       do {
349         int result = inflate(&zlib, Z_NO_FLUSH);
350         if (result == Z_OK || result == Z_STREAM_END) {
351               
352         } else {
353           SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
354           return;
355         }
356             
357         writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
358       } while ((writtenSize == 0) && (zlib.avail_in > 0));
359     
360       if (writtenSize > 0) {
361         activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
362       }
363     }
364     
365     virtual void foundTerminator(void)
366     {
367         idleTime.stamp();
368         switch (state) {
369         case STATE_IDLE:
370             beginResponse();
371             break;
372             
373         case STATE_GETTING_HEADERS:
374             processHeader();
375             buffer.clear();
376             break;
377             
378         case STATE_GETTING_BODY:
379             responseComplete();
380             break;
381         
382         case STATE_GETTING_CHUNKED:
383             processChunkHeader();
384             break;
385             
386         case STATE_GETTING_CHUNKED_BYTES:
387             setTerminator("\r\n");
388             state = STATE_GETTING_CHUNKED;
389             break;
390             
391
392         case STATE_GETTING_TRAILER:
393             processTrailer();
394             buffer.clear();
395             break;
396         
397         default:
398             break;
399         }
400     }
401     
402     bool hasIdleTimeout() const
403     {
404         if (state != STATE_IDLE) {
405             return false;
406         }
407         
408         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
409     }
410   
411     bool hasErrorTimeout() const
412     {
413       if (state == STATE_IDLE) {
414         return false;
415       }
416       
417       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
418     }
419     
420     bool hasError() const
421     {
422         return (state == STATE_SOCKET_ERROR);
423     }
424     
425     bool shouldStartNext() const
426     {
427       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
428     }
429 private:
430     bool connectToHost()
431     {
432         SG_LOG(SG_IO, SG_INFO, "HTTP connecting to " << host << ":" << port);
433         
434         if (!open()) {
435             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
436             return false;
437         }
438         
439         if (connect(host.c_str(), port) != 0) {
440             return false;
441         }
442         
443         return true;
444     }
445     
446     
447     void processHeader()
448     {
449         string h = strutils::simplify(buffer);
450         if (h.empty()) { // blank line terminates headers
451             headersComplete();
452             
453             if (contentGZip || contentDeflate) {
454                 memset(&zlib, 0, sizeof(z_stream));
455                 if (!zlibOutputBuffer) {
456                     zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
457                 }
458             
459                 // NULLs means we'll get default alloc+free methods
460                 // which is absolutely fine
461                 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
462                 zlib.next_out = zlibOutputBuffer;
463                 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
464                   SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
465                 }
466             }
467           
468             if (chunkedTransfer) {
469                 state = STATE_GETTING_CHUNKED;
470             } else if (noMessageBody || (bodyTransferSize == 0)) {
471                 // force the state to GETTING_BODY, to simplify logic in
472                 // responseComplete and handleClose
473                 state = STATE_GETTING_BODY;
474                 responseComplete();
475             } else {
476                 setByteCount(bodyTransferSize); // may be -1, that's fine              
477                 state = STATE_GETTING_BODY;
478             }
479             
480             return;
481         }
482               
483         int colonPos = buffer.find(':');
484         if (colonPos < 0) {
485             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
486             return;
487         }
488         
489         string key = strutils::simplify(buffer.substr(0, colonPos));
490         string lkey = boost::to_lower_copy(key);
491         string value = strutils::strip(buffer.substr(colonPos + 1));
492         
493         // only consider these if getting headers (as opposed to trailers 
494         // of a chunked transfer)
495         if (state == STATE_GETTING_HEADERS) {
496             if (lkey == "content-length") {
497
498                 int sz = strutils::to_int(value);
499                 if (bodyTransferSize <= 0) {
500                     bodyTransferSize = sz;
501                 }
502                 activeRequest->setResponseLength(sz);
503             } else if (lkey == "transfer-length") {
504                 bodyTransferSize = strutils::to_int(value);
505             } else if (lkey == "transfer-encoding") {
506                 processTransferEncoding(value);
507             } else if (lkey == "content-encoding") {
508               if (value == "gzip") {
509                 contentGZip = true;
510               } else if (value == "deflate") {
511                 contentDeflate = true;
512               } else if (value != "identity") {
513                 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
514               }
515             }
516         }
517     
518         activeRequest->responseHeader(lkey, value);
519     }
520     
521     void processTransferEncoding(const string& te)
522     {
523         if (te == "chunked") {
524             chunkedTransfer = true;
525         } else {
526             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
527             // failure
528         }
529     }
530     
531     void processChunkHeader()
532     {
533         if (buffer.empty()) {
534             // blank line after chunk data
535             return;
536         }
537         
538         int chunkSize = 0;
539         int semiPos = buffer.find(';');
540         if (semiPos >= 0) {
541             // extensions ignored for the moment
542             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
543         } else {
544             chunkSize = strutils::to_int(buffer, 16);
545         }
546         
547         buffer.clear();
548         if (chunkSize == 0) {  //  trailer start
549             state = STATE_GETTING_TRAILER;
550             return;
551         }
552         
553         state = STATE_GETTING_CHUNKED_BYTES;
554         setByteCount(chunkSize);
555     }
556     
557     void processTrailer()
558     {        
559         if (buffer.empty()) {
560             // end of trailers
561             responseComplete();
562             return;
563         }
564         
565     // process as a normal header
566         processHeader();
567     }
568     
569     void headersComplete()
570     {
571         activeRequest->responseHeadersComplete();
572     }
573     
574     void responseComplete()
575     {
576       //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
577         activeRequest->responseComplete();
578         client->requestFinished(this);
579       
580         if (contentDeflate) {
581           inflateEnd(&zlib);
582         }
583       
584         assert(sentRequests.front() == activeRequest);
585         sentRequests.pop_front();
586         bool doClose = activeRequest->closeAfterComplete();
587         activeRequest = NULL;
588       
589         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
590             if (doClose) {
591           // this will bring us into handleClose() above, which updates
592           // state to STATE_CLOSED
593             close();
594               
595           // if we have additional requests waiting, try to start them now
596             tryStartNextRequest();
597           }
598         }
599         
600       if (state != STATE_CLOSED)  {
601         state = STATE_IDLE;
602       }
603       setTerminator("\r\n");
604     }
605     
606     enum ConnectionState {
607         STATE_IDLE = 0,
608         STATE_GETTING_HEADERS,
609         STATE_GETTING_BODY,
610         STATE_GETTING_CHUNKED,
611         STATE_GETTING_CHUNKED_BYTES,
612         STATE_GETTING_TRAILER,
613         STATE_SOCKET_ERROR,
614         STATE_CLOSED             ///< connection should be closed now
615     };
616     
617     Client* client;
618     Request_ptr activeRequest;
619     ConnectionState state;
620     string host;
621     short port;
622     std::string buffer;
623     int bodyTransferSize;
624     SGTimeStamp idleTime;
625     bool chunkedTransfer;
626     bool noMessageBody;
627     int requestBodyBytesToSend;
628   
629     z_stream zlib;
630     unsigned char* zlibInflateBuffer;
631     int zlibInflateBufferSize;
632     unsigned char* zlibOutputBuffer;
633     bool contentGZip, contentDeflate;
634   
635     std::list<Request_ptr> queuedRequests;
636     std::list<Request_ptr> sentRequests;
637 };
638
639 Client::Client()
640 {
641     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
642 }
643
644 void Client::update(int waitTimeout)
645 {
646     NetChannel::poll(waitTimeout);
647         
648     ConnectionDict::iterator it = _connections.begin();
649     for (; it != _connections.end(); ) {
650         if (it->second->hasIdleTimeout() || it->second->hasError() ||
651             it->second->hasErrorTimeout())
652         {
653         // connection has been idle for a while, clean it up
654         // (or has an error condition, again clean it up)
655             ConnectionDict::iterator del = it++;
656             delete del->second;
657             _connections.erase(del);
658         } else {
659             if (it->second->shouldStartNext()) {
660                 SG_LOG(SG_IO, SG_INFO, "should start next, hmm");
661                 it->second->tryStartNextRequest();
662             }
663             
664             ++it;
665         }
666     } // of connecion iteration
667 }
668
669 void Client::makeRequest(const Request_ptr& r)
670 {
671     string host = r->host();
672     int port = r->port();
673     if (!_proxy.empty()) {
674         host = _proxy;
675         port = _proxyPort;
676     }
677     
678     stringstream ss;
679     ss << host << "-" << port;
680     string connectionId = ss.str();
681     
682     if (_connections.find(connectionId) == _connections.end()) {
683         Connection* con = new Connection(this);
684         con->setServer(host, port);
685         _connections[connectionId] = con;
686     }
687     
688     _connections[connectionId]->queueRequest(r);
689 }
690
691 void Client::requestFinished(Connection* con)
692 {
693     
694 }
695
696 void Client::setUserAgent(const string& ua)
697 {
698     _userAgent = ua;
699 }
700
701 void Client::setProxy(const string& proxy, int port, const string& auth)
702 {
703     _proxy = proxy;
704     _proxyPort = port;
705     _proxyAuth = auth;
706 }
707
708 } // of namespace HTTP
709
710 } // of namespace simgear