]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
b34cfd4eb94c50824174d2b25afa1f51c065fb55
[simgear.git] / simgear / io / HTTPClient.cxx
1 /**
2  * \file HTTPClient.cxx - simple HTTP client engine for SimHear
3  */
4
5 // Written by James Turner
6 //
7 // Copyright (C) 2013  James Turner  <zakalawe@mac.com>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 // Library General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
22 //
23
24 #include "HTTPClient.hxx"
25
26 #include <sstream>
27 #include <cassert>
28 #include <cstdlib> // rand()
29 #include <list>
30 #include <iostream>
31 #include <errno.h>
32 #include <map>
33
34 #include <boost/foreach.hpp>
35 #include <boost/algorithm/string/case_conv.hpp>
36
37 #include <simgear/io/sg_netChat.hxx>
38 #include <simgear/io/HTTPContentDecode.hxx>
39 #include <simgear/misc/strutils.hxx>
40 #include <simgear/compiler.h>
41 #include <simgear/debug/logstream.hxx>
42 #include <simgear/timing/timestamp.hxx>
43 #include <simgear/structure/exception.hxx>
44
45 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
46 #include "version.h"
47 #else
48 #  if !defined(SIMGEAR_VERSION)
49 #    define SIMGEAR_VERSION "simgear-development"
50 #  endif
51 #endif
52
53 using std::string;
54 using std::stringstream;
55 using std::vector;
56
57 namespace simgear
58 {
59
60 namespace HTTP
61 {
62
63 extern const int DEFAULT_HTTP_PORT = 80;
64 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
65 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
66
67 class Connection;
68 typedef std::multimap<std::string, Connection*> ConnectionDict;
69 typedef std::list<Request_ptr> RequestList;
70
71 class Client::ClientPrivate
72 {
73 public:
74     std::string userAgent;
75     std::string proxy;
76     int proxyPort;
77     std::string proxyAuth;
78     NetChannelPoller poller;
79     unsigned int maxConnections;
80     
81     RequestList pendingRequests;
82     
83 // connections by host (potentially more than one)
84     ConnectionDict connections;
85     
86     SGTimeStamp timeTransferSample;
87     unsigned int bytesTransferred;
88     unsigned int lastTransferRate;
89 };
90   
91 class Connection : public NetChat
92 {
93 public:
94     Connection(Client* pr) :
95         client(pr),
96         state(STATE_CLOSED),
97         port(DEFAULT_HTTP_PORT)
98     {
99     }
100     
101     virtual ~Connection()
102     {
103     }
104   
105     void setServer(const string& h, short p)
106     {
107         host = h;
108         port = p;
109     }
110     
111     // socket-level errors
112     virtual void handleError(int error)
113     {
114         if (error == ENOENT) {
115         // name lookup failure
116             // we won't have an active request yet, so the logic below won't
117             // fire to actually call setFailure. Let's fail all of the requests
118             BOOST_FOREACH(Request_ptr req, sentRequests) {
119                 req->setFailure(error, "hostname lookup failure");
120             }
121             
122             BOOST_FOREACH(Request_ptr req, queuedRequests) {
123                 req->setFailure(error, "hostname lookup failure");
124             }
125             
126         // name lookup failure, abandon all requests on this connection
127             sentRequests.clear();
128             queuedRequests.clear();
129         }
130         
131         NetChat::handleError(error);
132         if (activeRequest) {            
133             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
134             activeRequest->setFailure(error, "socket error");
135             activeRequest = NULL;
136             _contentDecoder.reset();
137         }
138     
139         state = STATE_SOCKET_ERROR;
140     }
141     
142     virtual void handleClose()
143     {      
144         NetChat::handleClose();
145
146     // closing of the connection from the server side when getting the body,
147         bool canCloseState = (state == STATE_GETTING_BODY);
148         if (canCloseState && activeRequest) {
149         // force state here, so responseComplete can avoid closing the 
150         // socket again
151             state =  STATE_CLOSED;
152             responseComplete();
153         } else {
154             if (activeRequest) {
155                 activeRequest->setFailure(500, "server closed connection");
156                 // remove the failed request from sentRequests, so it does 
157                 // not get restored
158                 RequestList::iterator it = std::find(sentRequests.begin(), 
159                     sentRequests.end(), activeRequest);
160                 if (it != sentRequests.end()) {
161                     sentRequests.erase(it);
162                 }
163                 activeRequest = NULL;
164                 _contentDecoder.reset();
165             }
166             
167             state = STATE_CLOSED;
168         }
169       
170       if (sentRequests.empty()) {
171         return;
172       }
173       
174     // restore sent requests to the queue, so they will be re-sent
175     // when the connection opens again
176       queuedRequests.insert(queuedRequests.begin(),
177                               sentRequests.begin(), sentRequests.end());
178       sentRequests.clear();
179     }
180     
181     void handleTimeout()
182     {
183         NetChat::handleError(ETIMEDOUT);
184         if (activeRequest) {
185             SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
186             activeRequest->setFailure(ETIMEDOUT, "socket timeout");
187             activeRequest = NULL;
188             _contentDecoder.reset();
189         }
190         
191         state = STATE_SOCKET_ERROR;
192     }
193     
194     void queueRequest(const Request_ptr& r)
195     {
196         queuedRequests.push_back(r);
197         tryStartNextRequest();
198     }
199     
200     void beginResponse()
201     {
202         assert(!sentRequests.empty());
203         assert(state == STATE_WAITING_FOR_RESPONSE);
204         
205         activeRequest = sentRequests.front();
206         
207       activeRequest->responseStart(buffer);
208       state = STATE_GETTING_HEADERS;
209       buffer.clear();
210       if (activeRequest->responseCode() == 204) {
211         noMessageBody = true;
212       } else if (activeRequest->method() == "HEAD") {
213         noMessageBody = true;
214       } else {
215         noMessageBody = false;
216       }
217
218       bodyTransferSize = -1;
219       chunkedTransfer = false;
220       _contentDecoder.reset();
221     }
222   
223     void tryStartNextRequest()
224     {
225       if (queuedRequests.empty()) {
226         idleTime.stamp();
227         return;
228       }
229       
230       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
231         return;
232       }
233       
234       if (state == STATE_CLOSED) {
235           if (!connectToHost()) {
236               return;
237           }
238           
239           setTerminator("\r\n");
240           state = STATE_IDLE;
241       }
242      
243       Request_ptr r = queuedRequests.front();
244       r->requestStart();
245       requestBodyBytesToSend = r->requestBodyLength();
246           
247       stringstream headerData;
248       string path = r->path();
249       assert(!path.empty());
250       string query = r->query();
251       string bodyData;
252       
253       if (!client->proxyHost().empty()) {
254           path = r->scheme() + "://" + r->host() + r->path();
255       }
256
257       if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
258           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
259           bodyData = query.substr(1); // URL-encode, drop the leading '?'
260           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
261           headerData << "Content-Length:" << bodyData.size() << "\r\n";
262       } else {
263           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
264           if (requestBodyBytesToSend >= 0) {
265             headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
266             headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
267           }
268       }
269       
270       headerData << "Host: " << r->hostAndPort() << "\r\n";
271       headerData << "User-Agent:" << client->userAgent() << "\r\n";
272       headerData << "Accept-Encoding: deflate, gzip\r\n";
273       if (!client->proxyAuth().empty()) {
274           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
275       }
276
277       BOOST_FOREACH(string h, r->requestHeaders()) {
278           headerData << h << ": " << r->header(h) << "\r\n";
279       }
280
281       headerData << "\r\n"; // final CRLF to terminate the headers
282       if (!bodyData.empty()) {
283           headerData << bodyData;
284       }
285       
286       bool ok = push(headerData.str().c_str());
287       if (!ok) {
288           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
289           // we've over-stuffed the socket, give up for now, let things
290           // drain down before trying to start any more requests.
291           return;
292       }
293       
294       while (requestBodyBytesToSend > 0) {
295         char buf[4096];
296         int len = r->getBodyData(buf, 4096);
297         if (len > 0) {
298           requestBodyBytesToSend -= len;
299           if (!bufferSend(buf, len)) {
300             SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
301             state = STATE_SOCKET_ERROR;
302             return;
303           }
304       //    SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
305         } else {
306           SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
307           break;
308         }
309       }
310       
311    //   SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
312    //       "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
313     //      "\n\t on connection " << this);
314     // successfully sent, remove from queue, and maybe send the next
315       queuedRequests.pop_front();
316       sentRequests.push_back(r);
317         state = STATE_WAITING_FOR_RESPONSE;
318         
319     // pipelining, let's maybe send the next request right away
320       tryStartNextRequest();
321     }
322     
323     virtual void collectIncomingData(const char* s, int n)
324     {
325         idleTime.stamp();
326         client->receivedBytes(static_cast<unsigned int>(n));
327         
328         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
329             _contentDecoder.receivedBytes(s, n);
330         } else {
331             buffer += string(s, n);
332         }
333     }
334
335     virtual void foundTerminator(void)
336     {
337         idleTime.stamp();
338         switch (state) {
339         case STATE_WAITING_FOR_RESPONSE:
340             beginResponse();
341             break;
342             
343         case STATE_GETTING_HEADERS:
344             processHeader();
345             buffer.clear();
346             break;
347             
348         case STATE_GETTING_BODY:
349             responseComplete();
350             break;
351         
352         case STATE_GETTING_CHUNKED:
353             processChunkHeader();
354             break;
355             
356         case STATE_GETTING_CHUNKED_BYTES:
357             setTerminator("\r\n");
358             state = STATE_GETTING_CHUNKED;
359             buffer.clear();
360             break;
361             
362
363         case STATE_GETTING_TRAILER:
364             processTrailer();
365             buffer.clear();
366             break;
367         
368         case STATE_IDLE:
369             SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
370                 
371         default:
372             break;
373         }
374     }
375     
376     bool hasIdleTimeout() const
377     {
378         if (state != STATE_IDLE) {
379             return false;
380         }
381         
382         assert(sentRequests.empty());
383         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
384     }
385   
386     bool hasErrorTimeout() const
387     {
388       if (state == STATE_IDLE) {
389         return false;
390       }
391       
392       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
393     }
394     
395     bool hasError() const
396     {
397         return (state == STATE_SOCKET_ERROR);
398     }
399     
400     bool shouldStartNext() const
401     {
402       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
403     }
404     
405     bool isActive() const
406     {
407         return !queuedRequests.empty() || !sentRequests.empty();
408     }
409 private:
410     bool connectToHost()
411     {
412         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
413         
414         if (!open()) {
415             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
416             return false;
417         }
418         
419         if (connect(host.c_str(), port) != 0) {
420             return false;
421         }
422         
423         return true;
424     }
425     
426     
427     void processHeader()
428     {
429         string h = strutils::simplify(buffer);
430         if (h.empty()) { // blank line terminates headers
431             headersComplete();
432             return;
433         }
434               
435         int colonPos = buffer.find(':');
436         if (colonPos < 0) {
437             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
438             return;
439         }
440         
441         string key = strutils::simplify(buffer.substr(0, colonPos));
442         string lkey = boost::to_lower_copy(key);
443         string value = strutils::strip(buffer.substr(colonPos + 1));
444         
445         // only consider these if getting headers (as opposed to trailers 
446         // of a chunked transfer)
447         if (state == STATE_GETTING_HEADERS) {
448             if (lkey == "content-length") {
449
450                 int sz = strutils::to_int(value);
451                 if (bodyTransferSize <= 0) {
452                     bodyTransferSize = sz;
453                 }
454                 activeRequest->setResponseLength(sz);
455             } else if (lkey == "transfer-length") {
456                 bodyTransferSize = strutils::to_int(value);
457             } else if (lkey == "transfer-encoding") {
458                 processTransferEncoding(value);
459             } else if (lkey == "content-encoding") {
460                 _contentDecoder.setEncoding(value);
461             }
462         }
463     
464         activeRequest->responseHeader(lkey, value);
465     }
466     
467     void processTransferEncoding(const string& te)
468     {
469         if (te == "chunked") {
470             chunkedTransfer = true;
471         } else {
472             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
473             // failure
474         }
475     }
476     
477     void processChunkHeader()
478     {
479         if (buffer.empty()) {
480             // blank line after chunk data
481             return;
482         }
483                 
484         int chunkSize = 0;
485         int semiPos = buffer.find(';');
486         if (semiPos >= 0) {
487             // extensions ignored for the moment
488             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
489         } else {
490             chunkSize = strutils::to_int(buffer, 16);
491         }
492         
493         buffer.clear();
494         if (chunkSize == 0) {  //  trailer start
495             state = STATE_GETTING_TRAILER;
496             return;
497         }
498         
499         state = STATE_GETTING_CHUNKED_BYTES;
500         setByteCount(chunkSize);
501     }
502     
503     void processTrailer()
504     {        
505         if (buffer.empty()) {
506             // end of trailers
507             responseComplete();
508             return;
509         }
510         
511     // process as a normal header
512         processHeader();
513     }
514     
515     void headersComplete()
516     {
517         activeRequest->responseHeadersComplete();
518         _contentDecoder.initWithRequest(activeRequest);
519       
520         if (chunkedTransfer) {
521             state = STATE_GETTING_CHUNKED;
522         } else if (noMessageBody || (bodyTransferSize == 0)) {
523             // force the state to GETTING_BODY, to simplify logic in
524             // responseComplete and handleClose
525             state = STATE_GETTING_BODY;
526             responseComplete();
527         } else {
528             setByteCount(bodyTransferSize); // may be -1, that's fine
529             state = STATE_GETTING_BODY;
530         }
531     }
532     
533     void responseComplete()
534     {
535         Request_ptr completedRequest = activeRequest;        
536         _contentDecoder.finish();
537       
538         assert(sentRequests.front() == activeRequest);
539         sentRequests.pop_front();
540         bool doClose = activeRequest->closeAfterComplete();
541         activeRequest = NULL;
542       
543         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
544             if (doClose) {
545           // this will bring us into handleClose() above, which updates
546           // state to STATE_CLOSED
547               close();
548               
549           // if we have additional requests waiting, try to start them now
550               tryStartNextRequest();
551             }
552         }
553         
554         if (state != STATE_CLOSED)  {
555             state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
556         }
557         
558     // notify request after we change state, so this connection is idle
559     // if completion triggers other requests (which is likely)
560         //   SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
561         completedRequest->responseComplete();
562         client->requestFinished(this);
563         
564         setTerminator("\r\n");
565     }
566     
567     enum ConnectionState {
568         STATE_IDLE = 0,
569         STATE_WAITING_FOR_RESPONSE,
570         STATE_GETTING_HEADERS,
571         STATE_GETTING_BODY,
572         STATE_GETTING_CHUNKED,
573         STATE_GETTING_CHUNKED_BYTES,
574         STATE_GETTING_TRAILER,
575         STATE_SOCKET_ERROR,
576         STATE_CLOSED             ///< connection should be closed now
577     };
578     
579     Client* client;
580     Request_ptr activeRequest;
581     ConnectionState state;
582     string host;
583     short port;
584     std::string buffer;
585     int bodyTransferSize;
586     SGTimeStamp idleTime;
587     bool chunkedTransfer;
588     bool noMessageBody;
589     int requestBodyBytesToSend;
590     
591     RequestList queuedRequests;
592     RequestList sentRequests;
593     
594     ContentDecoder _contentDecoder;
595 };
596
597 Client::Client() :
598     d(new ClientPrivate)
599 {
600     d->proxyPort = 0;
601     d->maxConnections = 4;
602     d->bytesTransferred = 0;
603     d->lastTransferRate = 0;
604     d->timeTransferSample.stamp();
605     
606     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
607 }
608
609 Client::~Client()
610 {
611 }
612
613 void Client::setMaxConnections(unsigned int maxCon)
614 {
615     if (maxCon < 1) {
616         throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
617     }
618     
619     d->maxConnections = maxCon;
620 }
621
622 void Client::update(int waitTimeout)
623 {
624     d->poller.poll(waitTimeout);
625     bool waitingRequests = !d->pendingRequests.empty();
626     
627     ConnectionDict::iterator it = d->connections.begin();
628     for (; it != d->connections.end(); ) {
629         Connection* con = it->second;
630         if (con->hasIdleTimeout() || 
631             con->hasError() ||
632             con->hasErrorTimeout() ||
633             (!con->isActive() && waitingRequests))
634         {
635             if (con->hasErrorTimeout()) {
636                 // tell the connection we're timing it out
637                 con->handleTimeout();
638             }
639             
640         // connection has been idle for a while, clean it up
641         // (or if we have requests waiting for a different host,
642         // or an error condition
643             ConnectionDict::iterator del = it++;
644             delete del->second;
645             d->connections.erase(del);
646         } else {
647             if (it->second->shouldStartNext()) {
648                 it->second->tryStartNextRequest();
649             }
650             ++it;
651         }
652     } // of connection iteration
653     
654     if (waitingRequests && (d->connections.size() < d->maxConnections)) {
655         RequestList waiting(d->pendingRequests);
656         d->pendingRequests.clear();
657         
658         // re-submit all waiting requests in order; this takes care of
659         // finding multiple pending items targetted to the same (new)
660         // connection
661         BOOST_FOREACH(Request_ptr req, waiting) {
662             makeRequest(req);
663         }
664     }
665 }
666
667 void Client::makeRequest(const Request_ptr& r)
668 {
669     string host = r->host();
670     int port = r->port();
671     if (!d->proxy.empty()) {
672         host = d->proxy;
673         port = d->proxyPort;
674     }
675     
676     Connection* con = NULL;
677     stringstream ss;
678     ss << host << "-" << port;
679     string connectionId = ss.str();
680     bool havePending = !d->pendingRequests.empty();
681     bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
682     ConnectionDict::iterator consEnd = d->connections.end();
683      
684     // assign request to an existing Connection.
685     // various options exist here, examined in order
686     ConnectionDict::iterator it = d->connections.find(connectionId);
687     if (atConnectionsLimit && (it == consEnd)) {
688         // maximum number of connections active, queue this request
689         // when a connection goes inactive, we'll start this one            
690         d->pendingRequests.push_back(r);
691         return;
692     }
693     
694     // scan for an idle Connection to the same host (likely if we're
695     // retrieving multiple resources from the same host in quick succession)
696     // if we have pending requests (waiting for a free Connection), then
697     // force new requests on this id to always use the first Connection
698     // (instead of the random selection below). This ensures that when
699     // there's pressure on the number of connections to keep alive, one
700     // host can't DoS every other.
701     int count = 0;
702     for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
703         if (havePending || !it->second->isActive()) {
704             con = it->second;
705             break;
706         }
707     }
708     
709     if (!con && atConnectionsLimit) {
710         // all current connections are busy (active), and we don't
711         // have free connections to allocate, so let's assign to
712         // an existing one randomly. Ideally we'd used whichever one will
713         // complete first but we don't have that info.
714         int index = rand() % count;
715         for (it = d->connections.find(connectionId); index > 0; --index) { ; }
716         con = it->second;
717     }
718     
719     // allocate a new connection object
720     if (!con) {
721         con = new Connection(this);
722         con->setServer(host, port);
723         d->poller.addChannel(con);
724         d->connections.insert(d->connections.end(), 
725             ConnectionDict::value_type(connectionId, con));
726     }
727     
728     con->queueRequest(r);
729 }
730
731 void Client::requestFinished(Connection* con)
732 {
733     
734 }
735
736 void Client::setUserAgent(const string& ua)
737 {
738     d->userAgent = ua;
739 }
740
741 const std::string& Client::userAgent() const
742 {
743     return d->userAgent;
744 }
745     
746 const std::string& Client::proxyHost() const
747 {
748     return d->proxy;
749 }
750     
751 const std::string& Client::proxyAuth() const
752 {
753     return d->proxyAuth;
754 }
755
756 void Client::setProxy(const string& proxy, int port, const string& auth)
757 {
758     d->proxy = proxy;
759     d->proxyPort = port;
760     d->proxyAuth = auth;
761 }
762
763 bool Client::hasActiveRequests() const
764 {
765     ConnectionDict::const_iterator it = d->connections.begin();
766     for (; it != d->connections.end(); ++it) {
767         if (it->second->isActive()) return true;
768     }
769     
770     return false;
771 }
772
773 void Client::receivedBytes(unsigned int count)
774 {
775     d->bytesTransferred += count;
776 }
777     
778 unsigned int Client::transferRateBytesPerSec() const
779 {
780     unsigned int e = d->timeTransferSample.elapsedMSec();
781     if (e < 400) {
782         // if called too frequently, return cahced value, to smooth out
783         // < 1 sec changes in flow
784         return d->lastTransferRate;
785     }
786     
787     unsigned int ratio = (d->bytesTransferred * 1000) / e;
788     d->timeTransferSample.stamp();
789     d->bytesTransferred = 0;
790     d->lastTransferRate = ratio;
791     return ratio;
792 }
793
794 } // of namespace HTTP
795
796 } // of namespace simgear