]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
ec25011f49ae1aa36624ed452499f21f2d6a5b6f
[simgear.git] / simgear / io / HTTPClient.cxx
1 #include "HTTPClient.hxx"
2
3 #include <sstream>
4 #include <cassert>
5 #include <list>
6 #include <iostream>
7 #include <errno.h>
8
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string/case_conv.hpp>
11
12 #include <zlib.h>
13
14 #include <simgear/io/sg_netChat.hxx>
15 #include <simgear/io/lowlevel.hxx>
16 #include <simgear/misc/strutils.hxx>
17 #include <simgear/compiler.h>
18 #include <simgear/debug/logstream.hxx>
19 #include <simgear/timing/timestamp.hxx>
20
21 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
22 #include "version.h"
23 #else
24 #  if !defined(SIMGEAR_VERSION)
25 #    define SIMGEAR_VERSION "simgear-development"
26 #  endif
27 #endif
28
29 using std::string;
30 using std::stringstream;
31 using std::vector;
32
33 namespace simgear
34 {
35
36 namespace HTTP
37 {
38
39 extern const int DEFAULT_HTTP_PORT = 80;
40 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
41 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
42 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
43 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
44   
45 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
46 // detailed description of the logic 
47 const int GZIP_HEADER_ID1 = 31;
48 const int GZIP_HEADER_ID2 = 139;
49 const int GZIP_HEADER_METHOD_DEFLATE = 8;
50 const int GZIP_HEADER_SIZE = 10;
51 const int GZIP_HEADER_FEXTRA = 1 << 2;
52 const int GZIP_HEADER_FNAME = 1 << 3;
53 const int GZIP_HEADER_COMMENT = 1 << 4;
54 const int GZIP_HEADER_CRC = 1 << 1;
55   
56 class Connection : public NetChat
57 {
58 public:
59     Connection(Client* pr) :
60         client(pr),
61         state(STATE_CLOSED),
62         port(DEFAULT_HTTP_PORT),
63         zlibInflateBuffer(NULL),
64         zlibInflateBufferSize(0),
65         zlibOutputBuffer(NULL)
66     {
67         
68     }
69     
70     virtual ~Connection()
71     {
72       if (zlibInflateBuffer) {
73         free(zlibInflateBuffer);
74       }
75       
76       if (zlibOutputBuffer) {
77         free(zlibOutputBuffer);
78       }
79     }
80   
81     void setServer(const string& h, short p)
82     {
83         host = h;
84         port = p;
85     }
86     
87     // socket-level errors
88     virtual void handleError(int error)
89     {
90         if (error == ENOENT) {
91             // name lookup failure, abandon all requests on this connection
92             sentRequests.clear();
93             queuedRequests.clear();
94         }
95         
96         NetChat::handleError(error);
97         if (activeRequest) {
98             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
99             activeRequest->setFailure(error, "socket error");
100             activeRequest = NULL;
101         }
102     
103         state = STATE_SOCKET_ERROR;
104     }
105     
106     virtual void handleClose()
107     {      
108         NetChat::handleClose();
109         
110         if ((state == STATE_GETTING_BODY) && activeRequest) {
111         // force state here, so responseComplete can avoid closing the 
112         // socket again
113             state =  STATE_CLOSED;
114             responseComplete();
115         } else {
116             state = STATE_CLOSED;
117         }
118       
119       if (sentRequests.empty()) {
120         return;
121       }
122       
123     // restore sent requests to the queue, so they will be re-sent
124     // when the connection opens again
125       queuedRequests.insert(queuedRequests.begin(),
126                               sentRequests.begin(), sentRequests.end());
127       sentRequests.clear();
128     }
129     
130     void queueRequest(const Request_ptr& r)
131     {
132       queuedRequests.push_back(r);
133       tryStartNextRequest();
134     }
135     
136     void beginResponse()
137     {
138       assert(!sentRequests.empty());
139       
140       activeRequest = sentRequests.front();      
141       activeRequest->responseStart(buffer);
142       state = STATE_GETTING_HEADERS;
143       buffer.clear();
144       if (activeRequest->responseCode() == 204) {
145         noMessageBody = true;
146       } else if (activeRequest->method() == "HEAD") {
147         noMessageBody = true;
148       } else {
149         noMessageBody = false;
150       }
151
152       bodyTransferSize = -1;
153       chunkedTransfer = false;
154       contentGZip = contentDeflate = false;
155     }
156   
157     void tryStartNextRequest()
158     {
159       if (queuedRequests.empty()) {
160         idleTime.stamp();
161         return;
162       }
163       
164       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
165         return;
166       }
167       
168       if (state == STATE_CLOSED) {
169           if (!connectToHost()) {
170               return;
171           }
172           
173           setTerminator("\r\n");
174           state = STATE_IDLE;
175       }
176      
177       Request_ptr r = queuedRequests.front();
178       requestBodyBytesToSend = r->requestBodyLength();
179           
180       stringstream headerData;
181       string path = r->path();
182       assert(!path.empty());
183       string query = r->query();
184       string bodyData;
185       
186       if (!client->proxyHost().empty()) {
187           path = r->scheme() + "://" + r->host() + r->path();
188       }
189
190       if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
191           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
192           bodyData = query.substr(1); // URL-encode, drop the leading '?'
193           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
194           headerData << "Content-Length:" << bodyData.size() << "\r\n";
195       } else {
196           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
197           if (requestBodyBytesToSend >= 0) {
198             headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
199             headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
200           }
201       }
202       
203       headerData << "Host: " << r->hostAndPort() << "\r\n";
204       headerData << "User-Agent:" << client->userAgent() << "\r\n";
205       headerData << "Accept-Encoding: deflate, gzip\r\n";
206       if (!client->proxyAuth().empty()) {
207           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
208       }
209
210       BOOST_FOREACH(string h, r->requestHeaders()) {
211           headerData << h << ": " << r->header(h) << "\r\n";
212       }
213
214       headerData << "\r\n"; // final CRLF to terminate the headers
215       if (!bodyData.empty()) {
216           headerData << bodyData;
217       }
218       
219       bool ok = push(headerData.str().c_str());
220       if (!ok) {
221           // we've over-stuffed the socket, give up for now, let things
222           // drain down before trying to start any more requests.
223           return;
224       }
225       
226       while (requestBodyBytesToSend > 0) {
227         char buf[4096];
228         int len = 4096;
229         r->getBodyData(buf, len);
230         if (len > 0) {
231           requestBodyBytesToSend -= len;
232           if (!bufferSend(buf, len)) {
233             SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
234             state = STATE_SOCKET_ERROR;
235             return;
236           }
237         } else {
238           SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
239           break;
240         }
241       }
242       
243       //std::cout << "did send request:" << r->url() << std::endl;
244     // successfully sent, remove from queue, and maybe send the next
245       queuedRequests.pop_front();
246       sentRequests.push_back(r);
247       
248     // pipelining, let's maybe send the next request right away
249       tryStartNextRequest();
250     }
251     
252     virtual void collectIncomingData(const char* s, int n)
253     {
254         idleTime.stamp();
255         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
256           if (contentGZip || contentDeflate) {
257             expandCompressedData(s, n);
258           } else {
259             activeRequest->processBodyBytes(s, n);
260           }
261         } else {
262             buffer += string(s, n);
263         }
264     }
265
266     
267     void expandCompressedData(const char* s, int n)
268     {
269       int reqSize = n + zlib.avail_in;
270       if (reqSize > zlibInflateBufferSize) {
271       // reallocate
272         unsigned char* newBuf = (unsigned char*) malloc(reqSize);
273         memcpy(newBuf, zlib.next_in, zlib.avail_in);
274         memcpy(newBuf + zlib.avail_in, s, n);
275         free(zlibInflateBuffer);
276         zlibInflateBuffer = newBuf;
277         zlibInflateBufferSize = reqSize;
278       } else {
279       // important to use memmove here, since it's very likely
280       // the source and destination ranges overlap
281         memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
282         memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
283       }
284             
285       zlib.next_in = (unsigned char*) zlibInflateBuffer;
286       zlib.avail_in = reqSize;
287       zlib.next_out = zlibOutputBuffer;
288       zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
289       
290       if (contentGZip) {
291         // we clear this down to contentDeflate once the GZip header has been seen
292         if (reqSize < GZIP_HEADER_SIZE) {
293           return; // need more header bytes
294         }
295         
296         if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
297             (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
298             (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
299         {
300           return; // invalid GZip header
301         }
302         
303         char flags = zlibInflateBuffer[3];
304         int gzipHeaderSize =  GZIP_HEADER_SIZE;
305         if (flags & GZIP_HEADER_FEXTRA) {
306           gzipHeaderSize += 2;
307           if (reqSize < gzipHeaderSize) {
308             return; // need more header bytes
309           }
310           
311           unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
312           if ( sgIsBigEndian() ) {
313               sgEndianSwap( &extraHeaderBytes );
314           }
315           
316           gzipHeaderSize += extraHeaderBytes;
317           if (reqSize < gzipHeaderSize) {
318             return; // need more header bytes
319           }
320         }
321         
322         if (flags & GZIP_HEADER_FNAME) {
323           gzipHeaderSize++;
324           while (gzipHeaderSize <= reqSize) {
325             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
326               break; // found terminating NULL character
327             }
328           }
329         }
330         
331         if (flags & GZIP_HEADER_COMMENT) {
332           gzipHeaderSize++;
333           while (gzipHeaderSize <= reqSize) {
334             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
335               break; // found terminating NULL character
336             }
337           }
338         }
339         
340         if (flags & GZIP_HEADER_CRC) {
341           gzipHeaderSize += 2;
342         }
343         
344         if (reqSize < gzipHeaderSize) {
345           return; // need more header bytes
346         }
347         
348         zlib.next_in += gzipHeaderSize;
349         zlib.avail_in = reqSize - gzipHeaderSize;
350       // now we've processed the GZip header, can decode as deflate
351         contentGZip = false;
352         contentDeflate = true;
353       }
354       
355       int writtenSize = 0;
356       do {
357         int result = inflate(&zlib, Z_NO_FLUSH);
358         if (result == Z_OK || result == Z_STREAM_END) {
359               
360         } else {
361           SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
362           return;
363         }
364             
365         writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
366       } while ((writtenSize == 0) && (zlib.avail_in > 0));
367     
368       if (writtenSize > 0) {
369         activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
370       }
371     }
372     
373     virtual void foundTerminator(void)
374     {
375         idleTime.stamp();
376         switch (state) {
377         case STATE_IDLE:
378             beginResponse();
379             break;
380             
381         case STATE_GETTING_HEADERS:
382             processHeader();
383             buffer.clear();
384             break;
385             
386         case STATE_GETTING_BODY:
387             responseComplete();
388             break;
389         
390         case STATE_GETTING_CHUNKED:
391             processChunkHeader();
392             break;
393             
394         case STATE_GETTING_CHUNKED_BYTES:
395             setTerminator("\r\n");
396             state = STATE_GETTING_CHUNKED;
397             break;
398             
399
400         case STATE_GETTING_TRAILER:
401             processTrailer();
402             buffer.clear();
403             break;
404         
405         default:
406             break;
407         }
408     }
409     
410     bool hasIdleTimeout() const
411     {
412         if (state != STATE_IDLE) {
413             return false;
414         }
415         
416         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
417     }
418   
419     bool hasErrorTimeout() const
420     {
421       if (state == STATE_IDLE) {
422         return false;
423       }
424       
425       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
426     }
427     
428     bool hasError() const
429     {
430         return (state == STATE_SOCKET_ERROR);
431     }
432     
433     bool shouldStartNext() const
434     {
435       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
436     }
437 private:
438     bool connectToHost()
439     {
440         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
441         
442         if (!open()) {
443             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
444             return false;
445         }
446         
447         if (connect(host.c_str(), port) != 0) {
448             return false;
449         }
450         
451         return true;
452     }
453     
454     
455     void processHeader()
456     {
457         string h = strutils::simplify(buffer);
458         if (h.empty()) { // blank line terminates headers
459             headersComplete();
460             
461             if (contentGZip || contentDeflate) {
462                 memset(&zlib, 0, sizeof(z_stream));
463                 if (!zlibOutputBuffer) {
464                     zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
465                 }
466             
467                 // NULLs means we'll get default alloc+free methods
468                 // which is absolutely fine
469                 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
470                 zlib.next_out = zlibOutputBuffer;
471                 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
472                   SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
473                 }
474             }
475           
476             if (chunkedTransfer) {
477                 state = STATE_GETTING_CHUNKED;
478             } else if (noMessageBody || (bodyTransferSize == 0)) {
479                 // force the state to GETTING_BODY, to simplify logic in
480                 // responseComplete and handleClose
481                 state = STATE_GETTING_BODY;
482                 responseComplete();
483             } else {
484                 setByteCount(bodyTransferSize); // may be -1, that's fine              
485                 state = STATE_GETTING_BODY;
486             }
487             
488             return;
489         }
490               
491         int colonPos = buffer.find(':');
492         if (colonPos < 0) {
493             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
494             return;
495         }
496         
497         string key = strutils::simplify(buffer.substr(0, colonPos));
498         string lkey = boost::to_lower_copy(key);
499         string value = strutils::strip(buffer.substr(colonPos + 1));
500         
501         // only consider these if getting headers (as opposed to trailers 
502         // of a chunked transfer)
503         if (state == STATE_GETTING_HEADERS) {
504             if (lkey == "content-length") {
505
506                 int sz = strutils::to_int(value);
507                 if (bodyTransferSize <= 0) {
508                     bodyTransferSize = sz;
509                 }
510                 activeRequest->setResponseLength(sz);
511             } else if (lkey == "transfer-length") {
512                 bodyTransferSize = strutils::to_int(value);
513             } else if (lkey == "transfer-encoding") {
514                 processTransferEncoding(value);
515             } else if (lkey == "content-encoding") {
516               if (value == "gzip") {
517                 contentGZip = true;
518               } else if (value == "deflate") {
519                 contentDeflate = true;
520               } else if (value != "identity") {
521                 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
522               }
523             }
524         }
525     
526         activeRequest->responseHeader(lkey, value);
527     }
528     
529     void processTransferEncoding(const string& te)
530     {
531         if (te == "chunked") {
532             chunkedTransfer = true;
533         } else {
534             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
535             // failure
536         }
537     }
538     
539     void processChunkHeader()
540     {
541         if (buffer.empty()) {
542             // blank line after chunk data
543             return;
544         }
545         
546         int chunkSize = 0;
547         int semiPos = buffer.find(';');
548         if (semiPos >= 0) {
549             // extensions ignored for the moment
550             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
551         } else {
552             chunkSize = strutils::to_int(buffer, 16);
553         }
554         
555         buffer.clear();
556         if (chunkSize == 0) {  //  trailer start
557             state = STATE_GETTING_TRAILER;
558             return;
559         }
560         
561         state = STATE_GETTING_CHUNKED_BYTES;
562         setByteCount(chunkSize);
563     }
564     
565     void processTrailer()
566     {        
567         if (buffer.empty()) {
568             // end of trailers
569             responseComplete();
570             return;
571         }
572         
573     // process as a normal header
574         processHeader();
575     }
576     
577     void headersComplete()
578     {
579         activeRequest->responseHeadersComplete();
580     }
581     
582     void responseComplete()
583     {
584       //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
585         activeRequest->responseComplete();
586         client->requestFinished(this);
587       
588         if (contentDeflate) {
589           inflateEnd(&zlib);
590         }
591       
592         assert(sentRequests.front() == activeRequest);
593         sentRequests.pop_front();
594         bool doClose = activeRequest->closeAfterComplete();
595         activeRequest = NULL;
596       
597         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
598             if (doClose) {
599           // this will bring us into handleClose() above, which updates
600           // state to STATE_CLOSED
601             close();
602               
603           // if we have additional requests waiting, try to start them now
604             tryStartNextRequest();
605           }
606         }
607         
608       if (state != STATE_CLOSED)  {
609         state = STATE_IDLE;
610       }
611       setTerminator("\r\n");
612     }
613     
614     enum ConnectionState {
615         STATE_IDLE = 0,
616         STATE_GETTING_HEADERS,
617         STATE_GETTING_BODY,
618         STATE_GETTING_CHUNKED,
619         STATE_GETTING_CHUNKED_BYTES,
620         STATE_GETTING_TRAILER,
621         STATE_SOCKET_ERROR,
622         STATE_CLOSED             ///< connection should be closed now
623     };
624     
625     Client* client;
626     Request_ptr activeRequest;
627     ConnectionState state;
628     string host;
629     short port;
630     std::string buffer;
631     int bodyTransferSize;
632     SGTimeStamp idleTime;
633     bool chunkedTransfer;
634     bool noMessageBody;
635     int requestBodyBytesToSend;
636   
637     z_stream zlib;
638     unsigned char* zlibInflateBuffer;
639     int zlibInflateBufferSize;
640     unsigned char* zlibOutputBuffer;
641     bool contentGZip, contentDeflate;
642   
643     std::list<Request_ptr> queuedRequests;
644     std::list<Request_ptr> sentRequests;
645 };
646
647 Client::Client()
648 {
649     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
650 }
651
652 void Client::update(int waitTimeout)
653 {
654     NetChannel::poll(waitTimeout);
655         
656     ConnectionDict::iterator it = _connections.begin();
657     for (; it != _connections.end(); ) {
658         if (it->second->hasIdleTimeout() || it->second->hasError() ||
659             it->second->hasErrorTimeout())
660         {
661         // connection has been idle for a while, clean it up
662         // (or has an error condition, again clean it up)
663             ConnectionDict::iterator del = it++;
664             delete del->second;
665             _connections.erase(del);
666         } else {
667             if (it->second->shouldStartNext()) {
668                 it->second->tryStartNextRequest();
669             }
670             
671             ++it;
672         }
673     } // of connecion iteration
674 }
675
676 void Client::makeRequest(const Request_ptr& r)
677 {
678     string host = r->host();
679     int port = r->port();
680     if (!_proxy.empty()) {
681         host = _proxy;
682         port = _proxyPort;
683     }
684     
685     stringstream ss;
686     ss << host << "-" << port;
687     string connectionId = ss.str();
688     
689     if (_connections.find(connectionId) == _connections.end()) {
690         Connection* con = new Connection(this);
691         con->setServer(host, port);
692         _connections[connectionId] = con;
693     }
694     
695     _connections[connectionId]->queueRequest(r);
696 }
697
698 void Client::requestFinished(Connection* con)
699 {
700     
701 }
702
703 void Client::setUserAgent(const string& ua)
704 {
705     _userAgent = ua;
706 }
707
708 void Client::setProxy(const string& proxy, int port, const string& auth)
709 {
710     _proxy = proxy;
711     _proxyPort = port;
712     _proxyAuth = auth;
713 }
714
715 } // of namespace HTTP
716
717 } // of namespace simgear