]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
8b4b25941bded817096a6323cd02e8d5beba4ffe
[simgear.git] / simgear / io / HTTPClient.cxx
1 /**
2  * \file HTTPClient.cxx - simple HTTP client engine for SimHear
3  */
4
5 // Written by James Turner
6 //
7 // Copyright (C) 2013  James Turner  <zakalawe@mac.com>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 // Library General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
22 //
23
24 #include "HTTPClient.hxx"
25
26 #include <sstream>
27 #include <cassert>
28 #include <cstdlib> // rand()
29 #include <list>
30 #include <iostream>
31 #include <errno.h>
32 #include <map>
33 #include <stdexcept>
34
35 #include <boost/foreach.hpp>
36 #include <boost/algorithm/string/case_conv.hpp>
37
38 #include <simgear/io/sg_netChat.hxx>
39 #include <simgear/io/HTTPContentDecode.hxx>
40 #include <simgear/misc/strutils.hxx>
41 #include <simgear/compiler.h>
42 #include <simgear/debug/logstream.hxx>
43 #include <simgear/timing/timestamp.hxx>
44 #include <simgear/structure/exception.hxx>
45
46 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
47 #include "version.h"
48 #else
49 #  if !defined(SIMGEAR_VERSION)
50 #    define SIMGEAR_VERSION "simgear-development"
51 #  endif
52 #endif
53
54 using std::string;
55 using std::stringstream;
56 using std::vector;
57
58 namespace simgear
59 {
60
61 namespace HTTP
62 {
63
64 extern const int DEFAULT_HTTP_PORT = 80;
65 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
66 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
67
68 class Connection;
69 typedef std::multimap<std::string, Connection*> ConnectionDict;
70 typedef std::list<Request_ptr> RequestList;
71
72 class Client::ClientPrivate
73 {
74 public:
75     std::string userAgent;
76     std::string proxy;
77     int proxyPort;
78     std::string proxyAuth;
79     NetChannelPoller poller;
80     unsigned int maxConnections;
81     
82     RequestList pendingRequests;
83     
84 // connections by host (potentially more than one)
85     ConnectionDict connections;
86     
87     SGTimeStamp timeTransferSample;
88     unsigned int bytesTransferred;
89     unsigned int lastTransferRate;
90 };
91   
92 class Connection : public NetChat
93 {
94 public:
95     Connection(Client* pr) :
96         client(pr),
97         state(STATE_CLOSED),
98         port(DEFAULT_HTTP_PORT)
99     {
100     }
101     
102     virtual ~Connection()
103     {
104     }
105   
106     void setServer(const string& h, short p)
107     {
108         host = h;
109         port = p;
110     }
111     
112     // socket-level errors
113     virtual void handleError(int error)
114     {
115         if (error == ENOENT) {
116         // name lookup failure
117             // we won't have an active request yet, so the logic below won't
118             // fire to actually call setFailure. Let's fail all of the requests
119             BOOST_FOREACH(Request_ptr req, sentRequests) {
120                 req->setFailure(error, "hostname lookup failure");
121             }
122             
123             BOOST_FOREACH(Request_ptr req, queuedRequests) {
124                 req->setFailure(error, "hostname lookup failure");
125             }
126             
127         // name lookup failure, abandon all requests on this connection
128             sentRequests.clear();
129             queuedRequests.clear();
130         }
131         
132         NetChat::handleError(error);
133         if (activeRequest) {            
134             SG_LOG(SG_IO, SG_INFO, "HTTP socket error");
135             activeRequest->setFailure(error, "socket error");
136             activeRequest = NULL;
137             _contentDecoder.reset();
138         }
139     
140         state = STATE_SOCKET_ERROR;
141     }
142     
143     virtual void handleClose()
144     {      
145         NetChat::handleClose();
146
147     // closing of the connection from the server side when getting the body,
148         bool canCloseState = (state == STATE_GETTING_BODY);
149         if (canCloseState && activeRequest) {
150         // force state here, so responseComplete can avoid closing the 
151         // socket again
152             state =  STATE_CLOSED;
153             responseComplete();
154         } else {
155             if (activeRequest) {
156                 activeRequest->setFailure(500, "server closed connection");
157                 // remove the failed request from sentRequests, so it does 
158                 // not get restored
159                 RequestList::iterator it = std::find(sentRequests.begin(), 
160                     sentRequests.end(), activeRequest);
161                 if (it != sentRequests.end()) {
162                     sentRequests.erase(it);
163                 }
164                 activeRequest = NULL;
165                 _contentDecoder.reset();
166             }
167             
168             state = STATE_CLOSED;
169         }
170       
171       if (sentRequests.empty()) {
172         return;
173       }
174       
175     // restore sent requests to the queue, so they will be re-sent
176     // when the connection opens again
177       queuedRequests.insert(queuedRequests.begin(),
178                               sentRequests.begin(), sentRequests.end());
179       sentRequests.clear();
180     }
181     
182     void handleTimeout()
183     {
184         NetChat::handleError(ETIMEDOUT);
185         if (activeRequest) {
186             SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
187             activeRequest->setFailure(ETIMEDOUT, "socket timeout");
188             activeRequest = NULL;
189             _contentDecoder.reset();
190         }
191         
192         state = STATE_SOCKET_ERROR;
193     }
194     
195     void queueRequest(const Request_ptr& r)
196     {
197         queuedRequests.push_back(r);
198         tryStartNextRequest();
199     }
200     
201     void beginResponse()
202     {
203         assert(!sentRequests.empty());
204         assert(state == STATE_WAITING_FOR_RESPONSE);
205         
206         activeRequest = sentRequests.front();
207         
208       activeRequest->responseStart(buffer);
209       state = STATE_GETTING_HEADERS;
210       buffer.clear();
211       if (activeRequest->responseCode() == 204) {
212         noMessageBody = true;
213       } else if (activeRequest->method() == "HEAD") {
214         noMessageBody = true;
215       } else {
216         noMessageBody = false;
217       }
218
219       bodyTransferSize = -1;
220       chunkedTransfer = false;
221       _contentDecoder.reset();
222     }
223   
224     void tryStartNextRequest()
225     {
226       if (queuedRequests.empty()) {
227         idleTime.stamp();
228         return;
229       }
230       
231       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
232         return;
233       }
234       
235       if (state == STATE_CLOSED) {
236           if (!connectToHost()) {
237               return;
238           }
239           
240           setTerminator("\r\n");
241           state = STATE_IDLE;
242       }
243      
244       Request_ptr r = queuedRequests.front();
245       r->requestStart();
246       requestBodyBytesToSend = r->requestBodyLength();
247           
248       stringstream headerData;
249       string path = r->path();
250       assert(!path.empty());
251       string query = r->query();
252       string bodyData;
253       
254       if (!client->proxyHost().empty()) {
255           path = r->scheme() + "://" + r->host() + r->path();
256       }
257
258       if (r->requestBodyType() == CONTENT_TYPE_URL_ENCODED) {
259           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
260           bodyData = query.substr(1); // URL-encode, drop the leading '?'
261           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
262           headerData << "Content-Length:" << bodyData.size() << "\r\n";
263       } else {
264           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
265           if (requestBodyBytesToSend >= 0) {
266             headerData << "Content-Length:" << requestBodyBytesToSend << "\r\n";
267             headerData << "Content-Type:" << r->requestBodyType() << "\r\n";
268           }
269       }
270       
271       headerData << "Host: " << r->hostAndPort() << "\r\n";
272       headerData << "User-Agent:" << client->userAgent() << "\r\n";
273       headerData << "Accept-Encoding: deflate, gzip\r\n";
274       if (!client->proxyAuth().empty()) {
275           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
276       }
277
278       BOOST_FOREACH(string h, r->requestHeaders()) {
279           headerData << h << ": " << r->header(h) << "\r\n";
280       }
281
282       headerData << "\r\n"; // final CRLF to terminate the headers
283       if (!bodyData.empty()) {
284           headerData << bodyData;
285       }
286       
287       bool ok = push(headerData.str().c_str());
288       if (!ok) {
289           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
290           // we've over-stuffed the socket, give up for now, let things
291           // drain down before trying to start any more requests.
292           return;
293       }
294       
295       while (requestBodyBytesToSend > 0) {
296         char buf[4096];
297         int len = r->getBodyData(buf, 4096);
298         if (len > 0) {
299           requestBodyBytesToSend -= len;
300           if (!bufferSend(buf, len)) {
301             SG_LOG(SG_IO, SG_WARN, "overflow the HTTP::Connection output buffer");
302             state = STATE_SOCKET_ERROR;
303             return;
304           }
305       //    SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
306         } else {
307           SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
308           break;
309         }
310       }
311       
312    //   SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
313    //       "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
314     //      "\n\t on connection " << this);
315     // successfully sent, remove from queue, and maybe send the next
316       queuedRequests.pop_front();
317       sentRequests.push_back(r);
318         state = STATE_WAITING_FOR_RESPONSE;
319         
320     // pipelining, let's maybe send the next request right away
321       tryStartNextRequest();
322     }
323     
324     virtual void collectIncomingData(const char* s, int n)
325     {
326         idleTime.stamp();
327         client->receivedBytes(static_cast<unsigned int>(n));
328         
329         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_CHUNKED_BYTES)) {
330             _contentDecoder.receivedBytes(s, n);
331         } else {
332             buffer += string(s, n);
333         }
334     }
335
336     virtual void foundTerminator(void)
337     {
338         idleTime.stamp();
339         switch (state) {
340         case STATE_WAITING_FOR_RESPONSE:
341             beginResponse();
342             break;
343             
344         case STATE_GETTING_HEADERS:
345             processHeader();
346             buffer.clear();
347             break;
348             
349         case STATE_GETTING_BODY:
350             responseComplete();
351             break;
352         
353         case STATE_GETTING_CHUNKED:
354             processChunkHeader();
355             break;
356             
357         case STATE_GETTING_CHUNKED_BYTES:
358             setTerminator("\r\n");
359             state = STATE_GETTING_CHUNKED;
360             buffer.clear();
361             break;
362             
363
364         case STATE_GETTING_TRAILER:
365             processTrailer();
366             buffer.clear();
367             break;
368         
369         case STATE_IDLE:
370             SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
371                 
372         default:
373             break;
374         }
375     }
376     
377     bool hasIdleTimeout() const
378     {
379         if (state != STATE_IDLE) {
380             return false;
381         }
382         
383         assert(sentRequests.empty());
384         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
385     }
386   
387     bool hasErrorTimeout() const
388     {
389       if (state == STATE_IDLE) {
390         return false;
391       }
392       
393       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
394     }
395     
396     bool hasError() const
397     {
398         return (state == STATE_SOCKET_ERROR);
399     }
400     
401     bool shouldStartNext() const
402     {
403       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
404     }
405     
406     bool isActive() const
407     {
408         return !queuedRequests.empty() || !sentRequests.empty();
409     }
410 private:
411     bool connectToHost()
412     {
413         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
414         
415         if (!open()) {
416             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
417             return false;
418         }
419         
420         if (connect(host.c_str(), port) != 0) {
421             return false;
422         }
423         
424         return true;
425     }
426     
427     
428     void processHeader()
429     {
430         string h = strutils::simplify(buffer);
431         if (h.empty()) { // blank line terminates headers
432             headersComplete();
433             return;
434         }
435               
436         int colonPos = buffer.find(':');
437         if (colonPos < 0) {
438             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
439             return;
440         }
441         
442         string key = strutils::simplify(buffer.substr(0, colonPos));
443         string lkey = boost::to_lower_copy(key);
444         string value = strutils::strip(buffer.substr(colonPos + 1));
445         
446         // only consider these if getting headers (as opposed to trailers 
447         // of a chunked transfer)
448         if (state == STATE_GETTING_HEADERS) {
449             if (lkey == "content-length") {
450
451                 int sz = strutils::to_int(value);
452                 if (bodyTransferSize <= 0) {
453                     bodyTransferSize = sz;
454                 }
455                 activeRequest->setResponseLength(sz);
456             } else if (lkey == "transfer-length") {
457                 bodyTransferSize = strutils::to_int(value);
458             } else if (lkey == "transfer-encoding") {
459                 processTransferEncoding(value);
460             } else if (lkey == "content-encoding") {
461                 _contentDecoder.setEncoding(value);
462             }
463         }
464     
465         activeRequest->responseHeader(lkey, value);
466     }
467     
468     void processTransferEncoding(const string& te)
469     {
470         if (te == "chunked") {
471             chunkedTransfer = true;
472         } else {
473             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
474             // failure
475         }
476     }
477     
478     void processChunkHeader()
479     {
480         if (buffer.empty()) {
481             // blank line after chunk data
482             return;
483         }
484                 
485         int chunkSize = 0;
486         int semiPos = buffer.find(';');
487         if (semiPos >= 0) {
488             // extensions ignored for the moment
489             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
490         } else {
491             chunkSize = strutils::to_int(buffer, 16);
492         }
493         
494         buffer.clear();
495         if (chunkSize == 0) {  //  trailer start
496             state = STATE_GETTING_TRAILER;
497             return;
498         }
499         
500         state = STATE_GETTING_CHUNKED_BYTES;
501         setByteCount(chunkSize);
502     }
503     
504     void processTrailer()
505     {        
506         if (buffer.empty()) {
507             // end of trailers
508             responseComplete();
509             return;
510         }
511         
512     // process as a normal header
513         processHeader();
514     }
515     
516     void headersComplete()
517     {
518         activeRequest->responseHeadersComplete();
519         _contentDecoder.initWithRequest(activeRequest);
520       
521         if (chunkedTransfer) {
522             state = STATE_GETTING_CHUNKED;
523         } else if (noMessageBody || (bodyTransferSize == 0)) {
524             // force the state to GETTING_BODY, to simplify logic in
525             // responseComplete and handleClose
526             state = STATE_GETTING_BODY;
527             responseComplete();
528         } else {
529             setByteCount(bodyTransferSize); // may be -1, that's fine
530             state = STATE_GETTING_BODY;
531         }
532     }
533     
534     void responseComplete()
535     {
536         Request_ptr completedRequest = activeRequest;        
537         _contentDecoder.finish();
538       
539         assert(sentRequests.front() == activeRequest);
540         sentRequests.pop_front();
541         bool doClose = activeRequest->closeAfterComplete();
542         activeRequest = NULL;
543       
544         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
545             if (doClose) {
546           // this will bring us into handleClose() above, which updates
547           // state to STATE_CLOSED
548               close();
549               
550           // if we have additional requests waiting, try to start them now
551               tryStartNextRequest();
552             }
553         }
554         
555         if (state != STATE_CLOSED)  {
556             state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
557         }
558         
559     // notify request after we change state, so this connection is idle
560     // if completion triggers other requests (which is likely)
561         //   SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
562         completedRequest->responseComplete();
563         client->requestFinished(this);
564         
565         setTerminator("\r\n");
566     }
567     
568     enum ConnectionState {
569         STATE_IDLE = 0,
570         STATE_WAITING_FOR_RESPONSE,
571         STATE_GETTING_HEADERS,
572         STATE_GETTING_BODY,
573         STATE_GETTING_CHUNKED,
574         STATE_GETTING_CHUNKED_BYTES,
575         STATE_GETTING_TRAILER,
576         STATE_SOCKET_ERROR,
577         STATE_CLOSED             ///< connection should be closed now
578     };
579     
580     Client* client;
581     Request_ptr activeRequest;
582     ConnectionState state;
583     string host;
584     short port;
585     std::string buffer;
586     int bodyTransferSize;
587     SGTimeStamp idleTime;
588     bool chunkedTransfer;
589     bool noMessageBody;
590     int requestBodyBytesToSend;
591     
592     RequestList queuedRequests;
593     RequestList sentRequests;
594     
595     ContentDecoder _contentDecoder;
596 };
597
598 Client::Client() :
599     d(new ClientPrivate)
600 {
601     d->proxyPort = 0;
602     d->maxConnections = 4;
603     d->bytesTransferred = 0;
604     d->lastTransferRate = 0;
605     d->timeTransferSample.stamp();
606     
607     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
608 }
609
610 Client::~Client()
611 {
612 }
613
614 void Client::setMaxConnections(unsigned int maxCon)
615 {
616     if (maxCon < 1) {
617         throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
618     }
619     
620     d->maxConnections = maxCon;
621 }
622
623 void Client::update(int waitTimeout)
624 {
625     d->poller.poll(waitTimeout);
626     bool waitingRequests = !d->pendingRequests.empty();
627     
628     ConnectionDict::iterator it = d->connections.begin();
629     for (; it != d->connections.end(); ) {
630         Connection* con = it->second;
631         if (con->hasIdleTimeout() || 
632             con->hasError() ||
633             con->hasErrorTimeout() ||
634             (!con->isActive() && waitingRequests))
635         {
636             if (con->hasErrorTimeout()) {
637                 // tell the connection we're timing it out
638                 con->handleTimeout();
639             }
640             
641         // connection has been idle for a while, clean it up
642         // (or if we have requests waiting for a different host,
643         // or an error condition
644             ConnectionDict::iterator del = it++;
645             delete del->second;
646             d->connections.erase(del);
647         } else {
648             if (it->second->shouldStartNext()) {
649                 it->second->tryStartNextRequest();
650             }
651             ++it;
652         }
653     } // of connection iteration
654     
655     if (waitingRequests && (d->connections.size() < d->maxConnections)) {
656         RequestList waiting(d->pendingRequests);
657         d->pendingRequests.clear();
658         
659         // re-submit all waiting requests in order; this takes care of
660         // finding multiple pending items targetted to the same (new)
661         // connection
662         BOOST_FOREACH(Request_ptr req, waiting) {
663             makeRequest(req);
664         }
665     }
666 }
667
668 void Client::makeRequest(const Request_ptr& r)
669 {
670     if( r->url().find("://") == std::string::npos )
671       throw std::runtime_error("Unable to parse URL (missing scheme)");
672
673     string host = r->host();
674     int port = r->port();
675     if (!d->proxy.empty()) {
676         host = d->proxy;
677         port = d->proxyPort;
678     }
679     
680     Connection* con = NULL;
681     stringstream ss;
682     ss << host << "-" << port;
683     string connectionId = ss.str();
684     bool havePending = !d->pendingRequests.empty();
685     bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
686     ConnectionDict::iterator consEnd = d->connections.end();
687      
688     // assign request to an existing Connection.
689     // various options exist here, examined in order
690     ConnectionDict::iterator it = d->connections.find(connectionId);
691     if (atConnectionsLimit && (it == consEnd)) {
692         // maximum number of connections active, queue this request
693         // when a connection goes inactive, we'll start this one            
694         d->pendingRequests.push_back(r);
695         return;
696     }
697     
698     // scan for an idle Connection to the same host (likely if we're
699     // retrieving multiple resources from the same host in quick succession)
700     // if we have pending requests (waiting for a free Connection), then
701     // force new requests on this id to always use the first Connection
702     // (instead of the random selection below). This ensures that when
703     // there's pressure on the number of connections to keep alive, one
704     // host can't DoS every other.
705     int count = 0;
706     for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
707         if (havePending || !it->second->isActive()) {
708             con = it->second;
709             break;
710         }
711     }
712     
713     if (!con && atConnectionsLimit) {
714         // all current connections are busy (active), and we don't
715         // have free connections to allocate, so let's assign to
716         // an existing one randomly. Ideally we'd used whichever one will
717         // complete first but we don't have that info.
718         int index = rand() % count;
719         for (it = d->connections.find(connectionId); index > 0; --index) { ; }
720         con = it->second;
721     }
722     
723     // allocate a new connection object
724     if (!con) {
725         con = new Connection(this);
726         con->setServer(host, port);
727         d->poller.addChannel(con);
728         d->connections.insert(d->connections.end(), 
729             ConnectionDict::value_type(connectionId, con));
730     }
731     
732     con->queueRequest(r);
733 }
734
735 void Client::requestFinished(Connection* con)
736 {
737     
738 }
739
740 void Client::setUserAgent(const string& ua)
741 {
742     d->userAgent = ua;
743 }
744
745 const std::string& Client::userAgent() const
746 {
747     return d->userAgent;
748 }
749     
750 const std::string& Client::proxyHost() const
751 {
752     return d->proxy;
753 }
754     
755 const std::string& Client::proxyAuth() const
756 {
757     return d->proxyAuth;
758 }
759
760 void Client::setProxy(const string& proxy, int port, const string& auth)
761 {
762     d->proxy = proxy;
763     d->proxyPort = port;
764     d->proxyAuth = auth;
765 }
766
767 bool Client::hasActiveRequests() const
768 {
769     ConnectionDict::const_iterator it = d->connections.begin();
770     for (; it != d->connections.end(); ++it) {
771         if (it->second->isActive()) return true;
772     }
773     
774     return false;
775 }
776
777 void Client::receivedBytes(unsigned int count)
778 {
779     d->bytesTransferred += count;
780 }
781     
782 unsigned int Client::transferRateBytesPerSec() const
783 {
784     unsigned int e = d->timeTransferSample.elapsedMSec();
785     if (e < 400) {
786         // if called too frequently, return cahced value, to smooth out
787         // < 1 sec changes in flow
788         return d->lastTransferRate;
789     }
790     
791     unsigned int ratio = (d->bytesTransferred * 1000) / e;
792     d->timeTransferSample.stamp();
793     d->bytesTransferred = 0;
794     d->lastTransferRate = ratio;
795     return ratio;
796 }
797
798 } // of namespace HTTP
799
800 } // of namespace simgear