]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
2eff7439ba3e1adaada1edb1c40c2012a89822a5
[simgear.git] / simgear / io / HTTPClient.cxx
1 #include "HTTPClient.hxx"
2
3 #include <sstream>
4 #include <cassert>
5 #include <list>
6 #include <iostream>
7
8 #include <boost/foreach.hpp>
9 #include <boost/algorithm/string/case_conv.hpp>
10
11 #include <zlib.h>
12
13 #include <simgear/io/sg_netChat.hxx>
14 #include <simgear/io/lowlevel.hxx>
15 #include <simgear/misc/strutils.hxx>
16 #include <simgear/compiler.h>
17 #include <simgear/debug/logstream.hxx>
18 #include <simgear/timing/timestamp.hxx>
19
20 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
21 #include "version.h"
22 #else
23 #  if !defined(SIMGEAR_VERSION)
24 #    define SIMGEAR_VERSION "simgear-development"
25 #  endif
26 #endif
27
28 using std::string;
29 using std::stringstream;
30 using std::vector;
31
32 namespace simgear
33 {
34
35 namespace HTTP
36 {
37
38 extern const int DEFAULT_HTTP_PORT = 80;
39 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
40 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
41 const int ZLIB_DECOMPRESS_BUFFER_SIZE = 32 * 1024;
42 const int ZLIB_INFLATE_WINDOW_BITS = -MAX_WBITS;
43   
44 // see http://www.ietf.org/rfc/rfc1952.txt for these values and
45 // detailed description of the logic 
46 const int GZIP_HEADER_ID1 = 31;
47 const int GZIP_HEADER_ID2 = 139;
48 const int GZIP_HEADER_METHOD_DEFLATE = 8;
49 const int GZIP_HEADER_SIZE = 10;
50 const int GZIP_HEADER_FEXTRA = 1 << 2;
51 const int GZIP_HEADER_FNAME = 1 << 3;
52 const int GZIP_HEADER_COMMENT = 1 << 4;
53 const int GZIP_HEADER_CRC = 1 << 1;
54   
55 class Connection : public NetChat
56 {
57 public:
58     Connection(Client* pr) :
59         client(pr),
60         state(STATE_CLOSED),
61         port(DEFAULT_HTTP_PORT),
62         zlibInflateBuffer(NULL),
63         zlibInflateBufferSize(0),
64         zlibOutputBuffer(NULL)
65     {
66         
67     }
68     
69     virtual ~Connection()
70     {
71       if (zlibInflateBuffer) {
72         free(zlibInflateBuffer);
73       }
74       
75       if (zlibOutputBuffer) {
76         free(zlibOutputBuffer);
77       }
78     }
79   
80     void setServer(const string& h, short p)
81     {
82         host = h;
83         port = p;
84     }
85     
86     // socket-level errors
87     virtual void handleError(int error)
88     {        
89         NetChat::handleError(error);
90         if (activeRequest) {
91             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
92             activeRequest->setFailure(error, "socket error");
93             activeRequest = NULL;
94         }
95     
96         state = STATE_SOCKET_ERROR;
97     }
98     
99     virtual void handleClose()
100     {      
101         NetChat::handleClose();
102         
103         if ((state == STATE_GETTING_BODY) && activeRequest) {
104         // force state here, so responseComplete can avoid closing the 
105         // socket again
106             state =  STATE_CLOSED;
107             responseComplete();
108         } else {
109             state = STATE_CLOSED;
110         }
111       
112       if (sentRequests.empty()) {
113         return;
114       }
115       
116     // restore sent requests to the queue, so they will be re-sent
117     // when the connection opens again
118       queuedRequests.insert(queuedRequests.begin(),
119                               sentRequests.begin(), sentRequests.end());
120       sentRequests.clear();
121     }
122     
123     void queueRequest(const Request_ptr& r)
124     {
125       queuedRequests.push_back(r);
126       tryStartNextRequest();
127     }
128     
129     void beginResponse()
130     {
131       assert(!sentRequests.empty());
132       
133       activeRequest = sentRequests.front();      
134       activeRequest->responseStart(buffer);
135       state = STATE_GETTING_HEADERS;
136       buffer.clear();
137       if (activeRequest->responseCode() == 204) {
138         noMessageBody = true;
139       } else if (activeRequest->method() == "HEAD") {
140         noMessageBody = true;
141       } else {
142         noMessageBody = false;
143       }
144
145       bodyTransferSize = -1;
146       chunkedTransfer = false;
147       contentGZip = contentDeflate = false;
148     }
149   
150     void tryStartNextRequest()
151     {
152       if (queuedRequests.empty()) {
153         idleTime.stamp();
154         return;
155       }
156       
157       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
158         return;
159       }
160       
161       if (state == STATE_CLOSED) {
162           if (!connectToHost()) {
163               return;
164           }
165           
166           setTerminator("\r\n");
167           state = STATE_IDLE;
168       }
169      
170       Request_ptr r = queuedRequests.front();
171       requestBodyBytesToSend = r->requestBodyLength();
172           
173       stringstream headerData;
174       string path = r->path();
175       assert(!path.empty());
176       string query = r->query();
177       string bodyData;
178       
179       if (!client->proxyHost().empty()) {
180           path = r->scheme() + "://" + r->host() + r->path();
181       }
182
183       if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
184           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
185           bodyData = query.substr(1); // URL-encode, drop the leading '?'
186           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
187           headerData << "Content-Length:" << bodyData.size() << "\r\n";
188       } else {
189           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
190           if (requestBodyBytesToSend >= 0) {
191             headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
192             headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
193           }
194       }
195       
196       headerData << "Host: " << r->hostAndPort() << "\r\n";
197       headerData << "User-Agent:" << client->userAgent() << "\r\n";
198       headerData << "Accept-Encoding: deflate, gzip\r\n";
199       if (!client->proxyAuth().empty()) {
200           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
201       }
202
203       BOOST_FOREACH(string h, r->requestHeaders()) {
204           headerData << h << ": " << r->header(h) << "\r\n";
205       }
206
207       headerData << "\r\n"; // final CRLF to terminate the headers
208       if (!bodyData.empty()) {
209           headerData << bodyData;
210       }
211       
212       bool ok = push(headerData.str().c_str());
213       if (!ok) {
214           // we've over-stuffed the socket, give up for now, let things
215           // drain down before trying to start any more requests.
216           return;
217       }
218       
219       while (requestBodyBytesToSend > 0) {
220         char buf[4096];
221         int len = 4096;
222         r->getBodyData(buf, len);
223         if (len > 0) {
224           requestBodyBytesToSend -= len;
225           if (!bufferSend(buf, len)) {
226             SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
227             state = STATE_SOCKET_ERROR;
228             return;
229           }
230         } else {
231           SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
232           break;
233         }
234       }
235       
236       //std::cout << "did send request:" << r->url() << std::endl;
237     // successfully sent, remove from queue, and maybe send the next
238       queuedRequests.pop_front();
239       sentRequests.push_back(r);
240       
241     // pipelining, let's maybe send the next request right away
242       tryStartNextRequest();
243     }
244     
245     virtual void collectIncomingData(const char* s, int n)
246     {
247         idleTime.stamp();
248         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
249           if (contentGZip || contentDeflate) {
250             expandCompressedData(s, n);
251           } else {
252             activeRequest->processBodyBytes(s, n);
253           }
254         } else {
255             buffer += string(s, n);
256         }
257     }
258
259     
260     void expandCompressedData(const char* s, int n)
261     {
262       int reqSize = n + zlib.avail_in;
263       if (reqSize > zlibInflateBufferSize) {
264       // reallocate
265         unsigned char* newBuf = (unsigned char*) malloc(reqSize);
266         memcpy(newBuf, zlib.next_in, zlib.avail_in);
267         memcpy(newBuf + zlib.avail_in, s, n);
268         free(zlibInflateBuffer);
269         zlibInflateBuffer = newBuf;
270         zlibInflateBufferSize = reqSize;
271       } else {
272       // important to use memmove here, since it's very likely
273       // the source and destination ranges overlap
274         memmove(zlibInflateBuffer, zlib.next_in, zlib.avail_in);
275         memcpy(zlibInflateBuffer + zlib.avail_in, s, n);
276       }
277             
278       zlib.next_in = (unsigned char*) zlibInflateBuffer;
279       zlib.avail_in = reqSize;
280       zlib.next_out = zlibOutputBuffer;
281       zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
282       
283       if (contentGZip) {
284         // we clear this down to contentDeflate once the GZip header has been seen
285         if (reqSize < GZIP_HEADER_SIZE) {
286           return; // need more header bytes
287         }
288         
289         if ((zlibInflateBuffer[0] != GZIP_HEADER_ID1) ||
290             (zlibInflateBuffer[1] != GZIP_HEADER_ID2) ||
291             (zlibInflateBuffer[2] != GZIP_HEADER_METHOD_DEFLATE))
292         {
293           return; // invalid GZip header
294         }
295         
296         char flags = zlibInflateBuffer[3];
297         int gzipHeaderSize =  GZIP_HEADER_SIZE;
298         if (flags & GZIP_HEADER_FEXTRA) {
299           gzipHeaderSize += 2;
300           if (reqSize < gzipHeaderSize) {
301             return; // need more header bytes
302           }
303           
304           unsigned short extraHeaderBytes = *(reinterpret_cast<unsigned short*>(zlibInflateBuffer + GZIP_HEADER_FEXTRA));
305           if ( sgIsBigEndian() ) {
306               sgEndianSwap( &extraHeaderBytes );
307           }
308           
309           gzipHeaderSize += extraHeaderBytes;
310           if (reqSize < gzipHeaderSize) {
311             return; // need more header bytes
312           }
313         }
314         
315         if (flags & GZIP_HEADER_FNAME) {
316           gzipHeaderSize++;
317           while (gzipHeaderSize <= reqSize) {
318             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
319               break; // found terminating NULL character
320             }
321           }
322         }
323         
324         if (flags & GZIP_HEADER_COMMENT) {
325           gzipHeaderSize++;
326           while (gzipHeaderSize <= reqSize) {
327             if (zlibInflateBuffer[gzipHeaderSize-1] == 0) {
328               break; // found terminating NULL character
329             }
330           }
331         }
332         
333         if (flags & GZIP_HEADER_CRC) {
334           gzipHeaderSize += 2;
335         }
336         
337         if (reqSize < gzipHeaderSize) {
338           return; // need more header bytes
339         }
340         
341         zlib.next_in += gzipHeaderSize;
342         zlib.avail_in = reqSize - gzipHeaderSize;
343       // now we've processed the GZip header, can decode as deflate
344         contentGZip = false;
345         contentDeflate = true;
346       }
347       
348       int writtenSize = 0;
349       do {
350         int result = inflate(&zlib, Z_NO_FLUSH);
351         if (result == Z_OK || result == Z_STREAM_END) {
352               
353         } else {
354           SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
355           return;
356         }
357             
358         writtenSize = ZLIB_DECOMPRESS_BUFFER_SIZE - zlib.avail_out;
359       } while ((writtenSize == 0) && (zlib.avail_in > 0));
360     
361       if (writtenSize > 0) {
362         activeRequest->processBodyBytes((const char*) zlibOutputBuffer, writtenSize);
363       }
364     }
365     
366     virtual void foundTerminator(void)
367     {
368         idleTime.stamp();
369         switch (state) {
370         case STATE_IDLE:
371             beginResponse();
372             break;
373             
374         case STATE_GETTING_HEADERS:
375             processHeader();
376             buffer.clear();
377             break;
378             
379         case STATE_GETTING_BODY:
380             responseComplete();
381             break;
382         
383         case STATE_GETTING_CHUNKED:
384             processChunkHeader();
385             break;
386             
387         case STATE_GETTING_CHUNKED_BYTES:
388             setTerminator("\r\n");
389             state = STATE_GETTING_CHUNKED;
390             break;
391             
392
393         case STATE_GETTING_TRAILER:
394             processTrailer();
395             buffer.clear();
396             break;
397         
398         default:
399             break;
400         }
401     }
402     
403     bool hasIdleTimeout() const
404     {
405         if (state != STATE_IDLE) {
406             return false;
407         }
408         
409         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
410     }
411   
412     bool hasErrorTimeout() const
413     {
414       if (state == STATE_IDLE) {
415         return false;
416       }
417       
418       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
419     }
420     
421     bool hasError() const
422     {
423         return (state == STATE_SOCKET_ERROR);
424     }
425     
426     bool shouldStartNext() const
427     {
428       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
429     }
430 private:
431     bool connectToHost()
432     {
433         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
434         
435         if (!open()) {
436             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
437             return false;
438         }
439         
440         if (connect(host.c_str(), port) != 0) {
441             return false;
442         }
443         
444         return true;
445     }
446     
447     
448     void processHeader()
449     {
450         string h = strutils::simplify(buffer);
451         if (h.empty()) { // blank line terminates headers
452             headersComplete();
453             
454             if (contentGZip || contentDeflate) {
455                 memset(&zlib, 0, sizeof(z_stream));
456                 if (!zlibOutputBuffer) {
457                     zlibOutputBuffer = (unsigned char*) malloc(ZLIB_DECOMPRESS_BUFFER_SIZE);
458                 }
459             
460                 // NULLs means we'll get default alloc+free methods
461                 // which is absolutely fine
462                 zlib.avail_out = ZLIB_DECOMPRESS_BUFFER_SIZE;
463                 zlib.next_out = zlibOutputBuffer;
464                 if (inflateInit2(&zlib, ZLIB_INFLATE_WINDOW_BITS) != Z_OK) {
465                   SG_LOG(SG_IO, SG_WARN, "inflateInit2 failed");
466                 }
467             }
468           
469             if (chunkedTransfer) {
470                 state = STATE_GETTING_CHUNKED;
471             } else if (noMessageBody || (bodyTransferSize == 0)) {
472                 // force the state to GETTING_BODY, to simplify logic in
473                 // responseComplete and handleClose
474                 state = STATE_GETTING_BODY;
475                 responseComplete();
476             } else {
477                 setByteCount(bodyTransferSize); // may be -1, that's fine              
478                 state = STATE_GETTING_BODY;
479             }
480             
481             return;
482         }
483               
484         int colonPos = buffer.find(':');
485         if (colonPos < 0) {
486             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
487             return;
488         }
489         
490         string key = strutils::simplify(buffer.substr(0, colonPos));
491         string lkey = boost::to_lower_copy(key);
492         string value = strutils::strip(buffer.substr(colonPos + 1));
493         
494         // only consider these if getting headers (as opposed to trailers 
495         // of a chunked transfer)
496         if (state == STATE_GETTING_HEADERS) {
497             if (lkey == "content-length") {
498
499                 int sz = strutils::to_int(value);
500                 if (bodyTransferSize <= 0) {
501                     bodyTransferSize = sz;
502                 }
503                 activeRequest->setResponseLength(sz);
504             } else if (lkey == "transfer-length") {
505                 bodyTransferSize = strutils::to_int(value);
506             } else if (lkey == "transfer-encoding") {
507                 processTransferEncoding(value);
508             } else if (lkey == "content-encoding") {
509               if (value == "gzip") {
510                 contentGZip = true;
511               } else if (value == "deflate") {
512                 contentDeflate = true;
513               } else if (value != "identity") {
514                 SG_LOG(SG_IO, SG_WARN, "unsupported content encoding:" << value);
515               }
516             }
517         }
518     
519         activeRequest->responseHeader(lkey, value);
520     }
521     
522     void processTransferEncoding(const string& te)
523     {
524         if (te == "chunked") {
525             chunkedTransfer = true;
526         } else {
527             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
528             // failure
529         }
530     }
531     
532     void processChunkHeader()
533     {
534         if (buffer.empty()) {
535             // blank line after chunk data
536             return;
537         }
538         
539         int chunkSize = 0;
540         int semiPos = buffer.find(';');
541         if (semiPos >= 0) {
542             // extensions ignored for the moment
543             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
544         } else {
545             chunkSize = strutils::to_int(buffer, 16);
546         }
547         
548         buffer.clear();
549         if (chunkSize == 0) {  //  trailer start
550             state = STATE_GETTING_TRAILER;
551             return;
552         }
553         
554         state = STATE_GETTING_CHUNKED_BYTES;
555         setByteCount(chunkSize);
556     }
557     
558     void processTrailer()
559     {        
560         if (buffer.empty()) {
561             // end of trailers
562             responseComplete();
563             return;
564         }
565         
566     // process as a normal header
567         processHeader();
568     }
569     
570     void headersComplete()
571     {
572         activeRequest->responseHeadersComplete();
573     }
574     
575     void responseComplete()
576     {
577       //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
578         activeRequest->responseComplete();
579         client->requestFinished(this);
580       
581         if (contentDeflate) {
582           inflateEnd(&zlib);
583         }
584       
585         assert(sentRequests.front() == activeRequest);
586         sentRequests.pop_front();
587         bool doClose = activeRequest->closeAfterComplete();
588         activeRequest = NULL;
589       
590         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
591             if (doClose) {
592           // this will bring us into handleClose() above, which updates
593           // state to STATE_CLOSED
594             close();
595               
596           // if we have additional requests waiting, try to start them now
597             tryStartNextRequest();
598           }
599         }
600         
601       if (state != STATE_CLOSED)  {
602         state = STATE_IDLE;
603       }
604       setTerminator("\r\n");
605     }
606     
607     enum ConnectionState {
608         STATE_IDLE = 0,
609         STATE_GETTING_HEADERS,
610         STATE_GETTING_BODY,
611         STATE_GETTING_CHUNKED,
612         STATE_GETTING_CHUNKED_BYTES,
613         STATE_GETTING_TRAILER,
614         STATE_SOCKET_ERROR,
615         STATE_CLOSED             ///< connection should be closed now
616     };
617     
618     Client* client;
619     Request_ptr activeRequest;
620     ConnectionState state;
621     string host;
622     short port;
623     std::string buffer;
624     int bodyTransferSize;
625     SGTimeStamp idleTime;
626     bool chunkedTransfer;
627     bool noMessageBody;
628     int requestBodyBytesToSend;
629   
630     z_stream zlib;
631     unsigned char* zlibInflateBuffer;
632     int zlibInflateBufferSize;
633     unsigned char* zlibOutputBuffer;
634     bool contentGZip, contentDeflate;
635   
636     std::list<Request_ptr> queuedRequests;
637     std::list<Request_ptr> sentRequests;
638 };
639
640 Client::Client()
641 {
642     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
643 }
644
645 void Client::update(int waitTimeout)
646 {
647     NetChannel::poll(waitTimeout);
648         
649     ConnectionDict::iterator it = _connections.begin();
650     for (; it != _connections.end(); ) {
651         if (it->second->hasIdleTimeout() || it->second->hasError() ||
652             it->second->hasErrorTimeout())
653         {
654         // connection has been idle for a while, clean it up
655         // (or has an error condition, again clean it up)
656             ConnectionDict::iterator del = it++;
657             delete del->second;
658             _connections.erase(del);
659         } else {
660             if (it->second->shouldStartNext()) {
661                 it->second->tryStartNextRequest();
662             }
663             
664             ++it;
665         }
666     } // of connecion iteration
667 }
668
669 void Client::makeRequest(const Request_ptr& r)
670 {
671     string host = r->host();
672     int port = r->port();
673     if (!_proxy.empty()) {
674         host = _proxy;
675         port = _proxyPort;
676     }
677     
678     stringstream ss;
679     ss << host << "-" << port;
680     string connectionId = ss.str();
681     
682     if (_connections.find(connectionId) == _connections.end()) {
683         Connection* con = new Connection(this);
684         con->setServer(host, port);
685         _connections[connectionId] = con;
686     }
687     
688     _connections[connectionId]->queueRequest(r);
689 }
690
691 void Client::requestFinished(Connection* con)
692 {
693     
694 }
695
696 void Client::setUserAgent(const string& ua)
697 {
698     _userAgent = ua;
699 }
700
701 void Client::setProxy(const string& proxy, int port, const string& auth)
702 {
703     _proxy = proxy;
704     _proxyPort = port;
705     _proxyAuth = auth;
706 }
707
708 } // of namespace HTTP
709
710 } // of namespace simgear