]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
Don't use object returned from vector::end()
[simgear.git] / simgear / io / HTTPClient.cxx
1 /**
2  * \file HTTPClient.cxx - simple HTTP client engine for SimHear
3  */
4
5 // Written by James Turner
6 //
7 // Copyright (C) 2013  James Turner  <zakalawe@mac.com>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 // Library General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
22 //
23
24
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
27
28 #include <sstream>
29 #include <cassert>
30 #include <cstdlib> // rand()
31 #include <list>
32 #include <errno.h>
33 #include <map>
34 #include <stdexcept>
35
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
38
39 #include <simgear/simgear_config.h>
40
41 #if defined(ENABLE_CURL)
42   #include <curl/multi.h>
43 #else
44     #include <simgear/io/HTTPContentDecode.hxx>
45 #endif
46
47 #include <simgear/io/sg_netChat.hxx>
48
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
54
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
56 #include "version.h"
57 #else
58 #  if !defined(SIMGEAR_VERSION)
59 #    define SIMGEAR_VERSION "simgear-development"
60 #  endif
61 #endif
62
63 namespace simgear
64 {
65
66 namespace HTTP
67 {
68
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
71
72 class Connection;
73 typedef std::multimap<std::string, Connection*> ConnectionDict;
74 typedef std::list<Request_ptr> RequestList;
75
76 class Client::ClientPrivate
77 {
78 public:
79 #if defined(ENABLE_CURL)
80     CURLM* curlMulti;
81
82     void createCurlMulti()
83     {
84         curlMulti = curl_multi_init();
85         // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
86         // we request HTTP 1.1 pipelining
87         curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
88         curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
89         curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
90                           (long) maxPipelineDepth);
91         curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
92                           (long) maxHostConnections);
93
94
95     }
96
97     typedef std::map<Request_ptr, CURL*> RequestCurlMap;
98     RequestCurlMap requests;
99 #else
100     NetChannelPoller poller;
101 // connections by host (potentially more than one)
102     ConnectionDict connections;
103 #endif
104
105     std::string userAgent;
106     std::string proxy;
107     int proxyPort;
108     std::string proxyAuth;
109     unsigned int maxConnections;
110     unsigned int maxHostConnections;
111     unsigned int maxPipelineDepth;
112
113     RequestList pendingRequests;
114
115     SGTimeStamp timeTransferSample;
116     unsigned int bytesTransferred;
117     unsigned int lastTransferRate;
118     uint64_t totalBytesDownloaded;
119 };
120
121 #if !defined(ENABLE_CURL)
122 class Connection : public NetChat
123 {
124 public:
125     Connection(Client* pr, const std::string& conId) :
126         client(pr),
127         state(STATE_CLOSED),
128         port(DEFAULT_HTTP_PORT),
129         _connectionId(conId),
130         _maxPipelineLength(255)
131     {
132     }
133
134     virtual ~Connection()
135     {
136     }
137
138     virtual void handleBufferRead (NetBuffer& buffer)
139     {
140       if( !activeRequest || !activeRequest->isComplete() )
141         return NetChat::handleBufferRead(buffer);
142
143       // Request should be aborted (signaled by setting its state to complete).
144
145       // force the state to GETTING_BODY, to simplify logic in
146       // responseComplete and handleClose
147         setState(STATE_GETTING_BODY);
148       responseComplete();
149     }
150
151     void setServer(const std::string& h, short p)
152     {
153         host = h;
154         port = p;
155     }
156
157     void setMaxPipelineLength(unsigned int m)
158     {
159         _maxPipelineLength = m;
160     }
161
162     // socket-level errors
163     virtual void handleError(int error)
164     {
165         const char* errStr = strerror(error);
166         SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
167                << errStr << ")");
168
169         debugDumpRequests();
170
171         if (!activeRequest)
172         {
173         // connection level failure, eg name lookup or routing
174         // we won't have an active request yet, so let's fail all of the
175         // requests since we presume it's a systematic failure for
176         // the host in question
177             BOOST_FOREACH(Request_ptr req, sentRequests) {
178                 req->setFailure(error, errStr);
179             }
180
181             BOOST_FOREACH(Request_ptr req, queuedRequests) {
182                 req->setFailure(error, errStr);
183             }
184
185             sentRequests.clear();
186             queuedRequests.clear();
187         }
188
189         NetChat::handleError(error);
190         if (activeRequest) {
191             activeRequest->setFailure(error, errStr);
192             activeRequest = NULL;
193             _contentDecoder.reset();
194         }
195
196         setState(STATE_SOCKET_ERROR);
197     }
198
199     void handleTimeout()
200     {
201         handleError(ETIMEDOUT);
202     }
203
204     virtual void handleClose()
205     {
206         NetChat::handleClose();
207
208     // closing of the connection from the server side when getting the body,
209         bool canCloseState = (state == STATE_GETTING_BODY);
210         bool isCancelling = (state == STATE_CANCELLING);
211
212         if (canCloseState && activeRequest) {
213             // check bodyTransferSize matches how much we actually transferred
214             if (bodyTransferSize > 0) {
215                 if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
216                     SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
217                            << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
218                 }
219             }
220
221         // force state here, so responseComplete can avoid closing the
222         // socket again
223             SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
224             setState(STATE_CLOSED);
225             responseComplete();
226         } else {
227             if (state == STATE_WAITING_FOR_RESPONSE) {
228                 SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
229                        << sentRequests.front()->url());
230                 assert(!sentRequests.empty());
231                 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
232                 // no active request, but don't restore the front sent one
233                 sentRequests.erase(sentRequests.begin());
234             }
235
236             if (activeRequest && !isCancelling) {
237                 activeRequest->setFailure(500, "server closed connection");
238                 // remove the failed request from sentRequests, so it does
239                 // not get restored
240                 RequestList::iterator it = std::find(sentRequests.begin(),
241                     sentRequests.end(), activeRequest);
242                 if (it != sentRequests.end()) {
243                     sentRequests.erase(it);
244                 }
245                 activeRequest = NULL;
246                 _contentDecoder.reset();
247             }
248
249             setState(STATE_CLOSED);
250         }
251
252       if (sentRequests.empty()) {
253         return;
254       }
255
256     // restore sent requests to the queue, so they will be re-sent
257     // when the connection opens again
258       queuedRequests.insert(queuedRequests.begin(),
259                               sentRequests.begin(), sentRequests.end());
260       sentRequests.clear();
261     }
262
263     void queueRequest(const Request_ptr& r)
264     {
265         queuedRequests.push_back(r);
266         tryStartNextRequest();
267     }
268
269     void cancelRequest(const Request_ptr& r)
270     {
271         RequestList::iterator it = std::find(sentRequests.begin(),
272                                              sentRequests.end(), r);
273         if (it != sentRequests.end()) {
274             sentRequests.erase(it);
275
276             if ((r == activeRequest) || !activeRequest) {
277                 // either the cancelling request is active, or we're in waiting
278                 // for response state - close now
279                 setState(STATE_CANCELLING);
280                 close();
281
282                 setState(STATE_CLOSED);
283                 activeRequest = NULL;
284                 _contentDecoder.reset();
285             } else if (activeRequest) {
286                 SG_LOG(SG_IO, SG_INFO, "con:" << _connectionId << " cancelling non-active: " << r->url());
287
288                 // has been sent but not active, let the active finish and
289                 // then close. Otherwise cancelling request #2 would mess up
290                 // active transfer #1
291                 activeRequest->setCloseAfterComplete();
292             }
293         } // of request has been sent
294
295         // simpler case, not sent yet just remove from the queue
296         it = std::find(queuedRequests.begin(), queuedRequests.end(), r);
297         if (it != queuedRequests.end()) {
298             queuedRequests.erase(it);
299         }
300     }
301
302     void beginResponse()
303     {
304         assert(!sentRequests.empty());
305         assert(state == STATE_WAITING_FOR_RESPONSE);
306
307         activeRequest = sentRequests.front();
308         try {
309             SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
310             activeRequest->responseStart(buffer);
311         } catch (sg_exception& e) {
312             handleError(EIO);
313             return;
314         }
315
316       setState(STATE_GETTING_HEADERS);
317       buffer.clear();
318       if (activeRequest->responseCode() == 204) {
319         noMessageBody = true;
320       } else if (activeRequest->method() == "HEAD") {
321         noMessageBody = true;
322       } else {
323         noMessageBody = false;
324       }
325
326       bodyTransferSize = -1;
327       chunkedTransfer = false;
328       _contentDecoder.reset();
329     }
330
331     void tryStartNextRequest()
332     {
333       while( !queuedRequests.empty()
334           && queuedRequests.front()->isComplete() )
335         queuedRequests.pop_front();
336
337       if (queuedRequests.empty()) {
338         idleTime.stamp();
339         return;
340       }
341
342       if (sentRequests.size() >= _maxPipelineLength) {
343         return;
344       }
345
346       if (state == STATE_CLOSED) {
347           if (!connectToHost()) {
348               setState(STATE_SOCKET_ERROR);
349               return;
350           }
351
352           SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
353           setTerminator("\r\n");
354           setState(STATE_IDLE);
355       }
356
357       Request_ptr r = queuedRequests.front();
358       r->requestStart();
359
360       std::stringstream headerData;
361       std::string path = r->path();
362       assert(!path.empty());
363       std::string query = r->query();
364       std::string bodyData;
365
366       if (!client->proxyHost().empty()) {
367           path = r->scheme() + "://" + r->host() + r->path();
368       }
369
370       if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
371           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
372           bodyData = query.substr(1); // URL-encode, drop the leading '?'
373           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
374           headerData << "Content-Length:" << bodyData.size() << "\r\n";
375       } else {
376           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
377           if( r->hasBodyData() )
378           {
379             headerData << "Content-Length:" << r->bodyLength() << "\r\n";
380             headerData << "Content-Type:" << r->bodyType() << "\r\n";
381           }
382       }
383
384       headerData << "Host: " << r->hostAndPort() << "\r\n";
385       headerData << "User-Agent:" << client->userAgent() << "\r\n";
386       headerData << "Accept-Encoding: deflate, gzip\r\n";
387       if (!client->proxyAuth().empty()) {
388           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
389       }
390
391       BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
392           headerData << h.first << ": " << h.second << "\r\n";
393       }
394
395       headerData << "\r\n"; // final CRLF to terminate the headers
396       if (!bodyData.empty()) {
397           headerData << bodyData;
398       }
399
400       bool ok = push(headerData.str().c_str());
401       if (!ok) {
402           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
403           // we've over-stuffed the socket, give up for now, let things
404           // drain down before trying to start any more requests.
405           return;
406       }
407
408       if( r->hasBodyData() )
409         for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
410         {
411           char buf[4096];
412           size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
413           if( len )
414           {
415             if( !bufferSend(buf, len) )
416             {
417               SG_LOG(SG_IO,
418                      SG_WARN,
419                      "overflow the HTTP::Connection output buffer");
420               state = STATE_SOCKET_ERROR;
421               return;
422             }
423             body_bytes_sent += len;
424           }
425           else
426           {
427             SG_LOG(SG_IO,
428                    SG_WARN,
429                    "HTTP asynchronous request body generation is unsupported");
430             break;
431           }
432         }
433
434         SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
435       // successfully sent, remove from queue, and maybe send the next
436       queuedRequests.pop_front();
437       sentRequests.push_back(r);
438       if (state == STATE_IDLE) {
439           setState(STATE_WAITING_FOR_RESPONSE);
440       }
441
442       // pipelining, let's maybe send the next request right away
443       tryStartNextRequest();
444     }
445
446     virtual void collectIncomingData(const char* s, int n)
447     {
448         idleTime.stamp();
449         client->receivedBytes(static_cast<unsigned int>(n));
450
451         if(   (state == STATE_GETTING_BODY)
452            || (state == STATE_GETTING_CHUNKED_BYTES) )
453           _contentDecoder.receivedBytes(s, n);
454         else
455           buffer.append(s, n);
456     }
457
458     virtual void foundTerminator(void)
459     {
460         idleTime.stamp();
461         switch (state) {
462         case STATE_WAITING_FOR_RESPONSE:
463             beginResponse();
464             break;
465
466         case STATE_GETTING_HEADERS:
467             processHeader();
468             buffer.clear();
469             break;
470
471         case STATE_GETTING_BODY:
472             responseComplete();
473             break;
474
475         case STATE_GETTING_CHUNKED:
476             processChunkHeader();
477             break;
478
479         case STATE_GETTING_CHUNKED_BYTES:
480             setTerminator("\r\n");
481             setState(STATE_GETTING_CHUNKED);
482             buffer.clear();
483             break;
484
485
486         case STATE_GETTING_TRAILER:
487             processTrailer();
488             buffer.clear();
489             break;
490
491         case STATE_IDLE:
492             SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
493
494         default:
495             break;
496         }
497     }
498
499     bool hasIdleTimeout() const
500     {
501         if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
502             return false;
503         }
504
505         assert(sentRequests.empty());
506         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
507         return isTimedOut;
508     }
509
510     bool hasErrorTimeout() const
511     {
512       if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
513         return false;
514       }
515
516         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
517         return isTimedOut;
518     }
519
520     bool hasError() const
521     {
522         return (state == STATE_SOCKET_ERROR);
523     }
524
525     bool shouldStartNext() const
526     {
527       return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
528     }
529
530     bool isActive() const
531     {
532         return !queuedRequests.empty() || !sentRequests.empty();
533     }
534
535     std::string connectionId() const
536     {
537         return _connectionId;
538     }
539
540     void debugDumpRequests() const
541     {
542         SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
543                << "; state=" << state << ")");
544         if (activeRequest) {
545             SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
546         } else {
547             SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
548         }
549
550         BOOST_FOREACH(Request_ptr req, sentRequests) {
551             SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
552         }
553
554         BOOST_FOREACH(Request_ptr req, queuedRequests) {
555             SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
556         }
557     }
558 private:
559     enum ConnectionState {
560         STATE_IDLE = 0,
561         STATE_WAITING_FOR_RESPONSE,
562         STATE_GETTING_HEADERS,
563         STATE_GETTING_BODY,
564         STATE_GETTING_CHUNKED,
565         STATE_GETTING_CHUNKED_BYTES,
566         STATE_GETTING_TRAILER,
567         STATE_SOCKET_ERROR,
568         STATE_CANCELLING,        ///< cancelling an acitve request
569         STATE_CLOSED             ///< connection should be closed now
570     };
571
572     void setState(ConnectionState newState)
573     {
574         if (state == newState) {
575             return;
576         }
577
578         state = newState;
579     }
580
581     bool connectToHost()
582     {
583         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
584
585         if (!open()) {
586             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
587             return false;
588         }
589
590         if (connect(host.c_str(), port) != 0) {
591             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
592             return false;
593         }
594
595         return true;
596     }
597
598
599     void processHeader()
600     {
601         std::string h = strutils::simplify(buffer);
602         if (h.empty()) { // blank line terminates headers
603             headersComplete();
604             return;
605         }
606
607         int colonPos = buffer.find(':');
608         if (colonPos < 0) {
609             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
610             return;
611         }
612
613         std::string key = strutils::simplify(buffer.substr(0, colonPos));
614         std::string lkey = boost::to_lower_copy(key);
615         std::string value = strutils::strip(buffer.substr(colonPos + 1));
616
617         // only consider these if getting headers (as opposed to trailers
618         // of a chunked transfer)
619         if (state == STATE_GETTING_HEADERS) {
620             if (lkey == "content-length") {
621
622                 int sz = strutils::to_int(value);
623                 if (bodyTransferSize <= 0) {
624                     bodyTransferSize = sz;
625                 }
626                 activeRequest->setResponseLength(sz);
627             } else if (lkey == "transfer-length") {
628                 bodyTransferSize = strutils::to_int(value);
629             } else if (lkey == "transfer-encoding") {
630                 processTransferEncoding(value);
631             } else if (lkey == "content-encoding") {
632                 _contentDecoder.setEncoding(value);
633             }
634         }
635
636         activeRequest->responseHeader(lkey, value);
637     }
638
639     void processTransferEncoding(const std::string& te)
640     {
641         if (te == "chunked") {
642             chunkedTransfer = true;
643         } else {
644             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
645             // failure
646         }
647     }
648
649     void processChunkHeader()
650     {
651         if (buffer.empty()) {
652             // blank line after chunk data
653             return;
654         }
655
656         int chunkSize = 0;
657         int semiPos = buffer.find(';');
658         if (semiPos >= 0) {
659             // extensions ignored for the moment
660             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
661         } else {
662             chunkSize = strutils::to_int(buffer, 16);
663         }
664
665         buffer.clear();
666         if (chunkSize == 0) {  //  trailer start
667             setState(STATE_GETTING_TRAILER);
668             return;
669         }
670
671         setState(STATE_GETTING_CHUNKED_BYTES);
672         setByteCount(chunkSize);
673     }
674
675     void processTrailer()
676     {
677         if (buffer.empty()) {
678             // end of trailers
679             responseComplete();
680             return;
681         }
682
683     // process as a normal header
684         processHeader();
685     }
686
687     void headersComplete()
688     {
689         activeRequest->responseHeadersComplete();
690         _contentDecoder.initWithRequest(activeRequest);
691
692         if (!activeRequest->serverSupportsPipelining()) {
693             SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it");
694             _maxPipelineLength = 1;
695         }
696
697         if (chunkedTransfer) {
698             setState(STATE_GETTING_CHUNKED);
699         } else if (noMessageBody || (bodyTransferSize == 0)) {
700             // force the state to GETTING_BODY, to simplify logic in
701             // responseComplete and handleClose
702             setState(STATE_GETTING_BODY);
703             responseComplete();
704         } else {
705             setByteCount(bodyTransferSize); // may be -1, that's fine
706             setState(STATE_GETTING_BODY);
707         }
708     }
709
710     void responseComplete()
711     {
712         Request_ptr completedRequest = activeRequest;
713         _contentDecoder.finish();
714
715         assert(sentRequests.front() == activeRequest);
716         sentRequests.pop_front();
717         bool doClose = activeRequest->closeAfterComplete();
718         activeRequest = NULL;
719
720         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
721             if (doClose) {
722                 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
723           // this will bring us into handleClose() above, which updates
724           // state to STATE_CLOSED
725               close();
726
727           // if we have additional requests waiting, try to start them now
728               tryStartNextRequest();
729             }
730         }
731
732         if (state != STATE_CLOSED)  {
733             setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
734         }
735
736     // notify request after we change state, so this connection is idle
737     // if completion triggers other requests (which is likely)
738         completedRequest->responseComplete();
739         client->requestFinished(this);
740
741         setTerminator("\r\n");
742     }
743
744     Client* client;
745     Request_ptr activeRequest;
746     ConnectionState state;
747     std::string host;
748     short port;
749     std::string buffer;
750     int bodyTransferSize;
751     SGTimeStamp idleTime;
752     bool chunkedTransfer;
753     bool noMessageBody;
754
755     RequestList queuedRequests;
756     RequestList sentRequests;
757
758     ContentDecoder _contentDecoder;
759     std::string _connectionId;
760     unsigned int _maxPipelineLength;
761 };
762 #endif // of !ENABLE_CURL
763
764 Client::Client() :
765     d(new ClientPrivate)
766 {
767     d->proxyPort = 0;
768     d->maxConnections = 4;
769     d->maxHostConnections = 4;
770     d->bytesTransferred = 0;
771     d->lastTransferRate = 0;
772     d->timeTransferSample.stamp();
773     d->totalBytesDownloaded = 0;
774     d->maxPipelineDepth = 5;
775     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
776 #if defined(ENABLE_CURL)
777     static bool didInitCurlGlobal = false;
778     if (!didInitCurlGlobal) {
779       curl_global_init(CURL_GLOBAL_ALL);
780       didInitCurlGlobal = true;
781     }
782
783     d->createCurlMulti();
784 #endif
785 }
786
787 Client::~Client()
788 {
789 #if defined(ENABLE_CURL)
790   curl_multi_cleanup(d->curlMulti);
791 #endif
792 }
793
794 void Client::setMaxConnections(unsigned int maxCon)
795 {
796     d->maxConnections = maxCon;
797 #if defined(ENABLE_CURL)
798     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
799 #endif
800 }
801
802 void Client::setMaxHostConnections(unsigned int maxHostCon)
803 {
804     d->maxHostConnections = maxHostCon;
805 #if defined(ENABLE_CURL)
806     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
807 #endif
808 }
809
810 void Client::setMaxPipelineDepth(unsigned int depth)
811 {
812     d->maxPipelineDepth = depth;
813 #if defined(ENABLE_CURL)
814     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
815 #else
816     ConnectionDict::iterator it = d->connections.begin();
817     for (; it != d->connections.end(); ) {
818         it->second->setMaxPipelineLength(depth);
819     }
820 #endif
821 }
822
823 void Client::update(int waitTimeout)
824 {
825 #if defined(ENABLE_CURL)
826     int remainingActive, messagesInQueue;
827     curl_multi_perform(d->curlMulti, &remainingActive);
828
829     CURLMsg* msg;
830     while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
831       if (msg->msg == CURLMSG_DONE) {
832         Request* rawReq = 0;
833         CURL *e = msg->easy_handle;
834         curl_easy_getinfo(e, CURLINFO_PRIVATE, &rawReq);
835
836         // ensure request stays valid for the moment
837         // eg if responseComplete cancels us
838         Request_ptr req(rawReq);
839
840         long responseCode;
841         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
842
843           // remove from the requests map now,
844           // in case the callbacks perform a cancel. We'll use
845           // the absence from the request dict in cancel to avoid
846           // a double remove
847           ClientPrivate::RequestCurlMap::iterator it = d->requests.find(req);
848           assert(it != d->requests.end());
849           assert(it->second == e);
850           d->requests.erase(it);
851
852         if (msg->data.result == 0) {
853           req->responseComplete();
854         } else {
855           SG_LOG(SG_IO, SG_WARN, "CURL Result:" << msg->data.result << " " << curl_easy_strerror(msg->data.result));
856           req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
857         }
858
859         curl_multi_remove_handle(d->curlMulti, e);
860         curl_easy_cleanup(e);
861       } else {
862           // should never happen since CURLMSG_DONE is the only code
863           // defined!
864           SG_LOG(SG_IO, SG_ALERT, "unknown CurlMSG:" << msg->msg);
865       }
866     } // of curl message processing loop
867     SGTimeStamp::sleepForMSec(waitTimeout);
868 #else
869     if (!d->poller.hasChannels() && (waitTimeout > 0)) {
870         SGTimeStamp::sleepForMSec(waitTimeout);
871     } else {
872         d->poller.poll(waitTimeout);
873     }
874
875     bool waitingRequests = !d->pendingRequests.empty();
876     ConnectionDict::iterator it = d->connections.begin();
877     for (; it != d->connections.end(); ) {
878         Connection* con = it->second;
879         if (con->hasIdleTimeout() ||
880             con->hasError() ||
881             con->hasErrorTimeout() ||
882             (!con->isActive() && waitingRequests))
883         {
884             if (con->hasErrorTimeout()) {
885                 // tell the connection we're timing it out
886                 con->handleTimeout();
887             }
888
889         // connection has been idle for a while, clean it up
890         // (or if we have requests waiting for a different host,
891         // or an error condition
892             ConnectionDict::iterator del = it++;
893             delete del->second;
894             d->connections.erase(del);
895         } else {
896             if (it->second->shouldStartNext()) {
897                 it->second->tryStartNextRequest();
898             }
899             ++it;
900         }
901     } // of connection iteration
902
903     if (waitingRequests && (d->connections.size() < d->maxConnections)) {
904         RequestList waiting(d->pendingRequests);
905         d->pendingRequests.clear();
906
907         // re-submit all waiting requests in order; this takes care of
908         // finding multiple pending items targetted to the same (new)
909         // connection
910         BOOST_FOREACH(Request_ptr req, waiting) {
911             makeRequest(req);
912         }
913     }
914 #endif
915 }
916
917 void Client::makeRequest(const Request_ptr& r)
918 {
919     if( r->isComplete() )
920       return;
921
922     if( r->url().find("://") == std::string::npos ) {
923         r->setFailure(EINVAL, "malformed URL");
924         return;
925     }
926
927     r->_client = this;
928
929 #if defined(ENABLE_CURL)
930     ClientPrivate::RequestCurlMap::iterator rit = d->requests.find(r);
931     assert(rit == d->requests.end());
932
933     CURL* curlRequest = curl_easy_init();
934     curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
935
936     d->requests[r] = curlRequest;
937
938     curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
939     // disable built-in libCurl progress feedback
940     curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
941
942     curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
943     curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
944     curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
945     curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
946
947     curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
948     curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
949
950     if (!d->proxy.empty()) {
951       curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
952       curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
953
954       if (!d->proxyAuth.empty()) {
955         curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
956         curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
957       }
958     }
959
960     std::string method = boost::to_lower_copy(r->method());
961     if (method == "get") {
962       curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
963     } else if (method == "put") {
964       curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
965       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
966     } else if (method == "post") {
967       // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
968       curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
969
970       std::string q = r->query().substr(1);
971       curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
972
973       // reset URL to exclude query pieces
974       std::string urlWithoutQuery = r->url();
975       std::string::size_type queryPos = urlWithoutQuery.find('?');
976       urlWithoutQuery.resize(queryPos);
977       curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
978     } else {
979       curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
980     }
981
982     struct curl_slist* headerList = NULL;
983     if (r->hasBodyData() && (method != "post")) {
984       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
985       curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
986       curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
987       curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
988       std::string h = "Content-Type:" + r->bodyType();
989       headerList = curl_slist_append(headerList, h.c_str());
990     }
991
992     StringMap::const_iterator it;
993     for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
994       std::string h = it->first + ": " + it->second;
995       headerList = curl_slist_append(headerList, h.c_str());
996     }
997
998     if (headerList != NULL) {
999       curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
1000     }
1001
1002     curl_multi_add_handle(d->curlMulti, curlRequest);
1003
1004 // this seems premature, but we don't have a callback from Curl we could
1005 // use to trigger when the requst is actually sent.
1006     r->requestStart();
1007
1008 #else
1009     if( r->url().find("http://") != 0 ) {
1010         r->setFailure(EINVAL, "only HTTP protocol is supported");
1011         return;
1012     }
1013
1014     std::string host = r->host();
1015     int port = r->port();
1016     if (!d->proxy.empty()) {
1017         host = d->proxy;
1018         port = d->proxyPort;
1019     }
1020
1021     Connection* con = NULL;
1022     std::stringstream ss;
1023     ss << host << "-" << port;
1024     std::string connectionId = ss.str();
1025     bool havePending = !d->pendingRequests.empty();
1026     bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
1027     ConnectionDict::iterator consEnd = d->connections.end();
1028
1029     // assign request to an existing Connection.
1030     // various options exist here, examined in order
1031     ConnectionDict::iterator it = d->connections.find(connectionId);
1032     if (atConnectionsLimit && (it == consEnd)) {
1033         // maximum number of connections active, queue this request
1034         // when a connection goes inactive, we'll start this one
1035         d->pendingRequests.push_back(r);
1036         return;
1037     }
1038
1039     // scan for an idle Connection to the same host (likely if we're
1040     // retrieving multiple resources from the same host in quick succession)
1041     // if we have pending requests (waiting for a free Connection), then
1042     // force new requests on this id to always use the first Connection
1043     // (instead of the random selection below). This ensures that when
1044     // there's pressure on the number of connections to keep alive, one
1045     // host can't DoS every other.
1046     int count = 0;
1047     for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
1048         if (havePending || !it->second->isActive()) {
1049             con = it->second;
1050             break;
1051         }
1052     }
1053
1054     bool atHostConnectionsLimit = (count >= d->maxHostConnections);
1055
1056     if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
1057         // all current connections are busy (active), and we don't
1058         // have free connections to allocate, so let's assign to
1059         // an existing one randomly. Ideally we'd used whichever one will
1060         // complete first but we don't have that info.
1061         int index = rand() % count;
1062         for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
1063         con = it->second;
1064     }
1065
1066     // allocate a new connection object
1067     if (!con) {
1068         static int connectionSuffx = 0;
1069
1070         std::stringstream ss;
1071         ss << connectionId << "-" << connectionSuffx++;
1072
1073         SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
1074         con = new Connection(this, ss.str());
1075         con->setServer(host, port);
1076         con->setMaxPipelineLength(d->maxPipelineDepth);
1077         d->poller.addChannel(con);
1078         d->connections.insert(d->connections.end(),
1079             ConnectionDict::value_type(connectionId, con));
1080     }
1081
1082     SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
1083     con->queueRequest(r);
1084 #endif
1085 }
1086
1087 void Client::cancelRequest(const Request_ptr &r, std::string reason)
1088 {
1089 #if defined(ENABLE_CURL)
1090     ClientPrivate::RequestCurlMap::iterator it = d->requests.find(r);
1091     if(it == d->requests.end()) {
1092         // already being removed, presumably inside ::update()
1093         // nothing more to do
1094         return;
1095     }
1096
1097     CURLMcode err = curl_multi_remove_handle(d->curlMulti, it->second);
1098     assert(err == CURLM_OK);
1099
1100     // clear the request pointer form the curl-easy object
1101     curl_easy_setopt(it->second, CURLOPT_PRIVATE, 0);
1102
1103     curl_easy_cleanup(it->second);
1104     d->requests.erase(it);
1105 #else
1106     ConnectionDict::iterator it = d->connections.begin();
1107     for (; it != d->connections.end(); ++it) {
1108         (it->second)->cancelRequest(r);
1109     }
1110 #endif
1111     r->setFailure(-1, reason);
1112 }
1113
1114 //------------------------------------------------------------------------------
1115 FileRequestRef Client::save( const std::string& url,
1116                              const std::string& filename )
1117 {
1118   FileRequestRef req = new FileRequest(url, filename);
1119   makeRequest(req);
1120   return req;
1121 }
1122
1123 //------------------------------------------------------------------------------
1124 MemoryRequestRef Client::load(const std::string& url)
1125 {
1126   MemoryRequestRef req = new MemoryRequest(url);
1127   makeRequest(req);
1128   return req;
1129 }
1130
1131 void Client::requestFinished(Connection* con)
1132 {
1133
1134 }
1135
1136 void Client::setUserAgent(const std::string& ua)
1137 {
1138     d->userAgent = ua;
1139 }
1140
1141 const std::string& Client::userAgent() const
1142 {
1143     return d->userAgent;
1144 }
1145
1146 const std::string& Client::proxyHost() const
1147 {
1148     return d->proxy;
1149 }
1150
1151 const std::string& Client::proxyAuth() const
1152 {
1153     return d->proxyAuth;
1154 }
1155
1156 void Client::setProxy( const std::string& proxy,
1157                        int port,
1158                        const std::string& auth )
1159 {
1160     d->proxy = proxy;
1161     d->proxyPort = port;
1162     d->proxyAuth = auth;
1163 }
1164
1165 bool Client::hasActiveRequests() const
1166 {
1167   #if defined(ENABLE_CURL)
1168     return !d->requests.empty();
1169   #else
1170     ConnectionDict::const_iterator it = d->connections.begin();
1171     for (; it != d->connections.end(); ++it) {
1172         if (it->second->isActive()) return true;
1173     }
1174
1175     return false;
1176 #endif
1177 }
1178
1179 void Client::receivedBytes(unsigned int count)
1180 {
1181     d->bytesTransferred += count;
1182     d->totalBytesDownloaded += count;
1183 }
1184
1185 unsigned int Client::transferRateBytesPerSec() const
1186 {
1187     unsigned int e = d->timeTransferSample.elapsedMSec();
1188     if (e > 400) {
1189         // too long a window, ignore
1190         d->timeTransferSample.stamp();
1191         d->bytesTransferred = 0;
1192         d->lastTransferRate = 0;
1193         return 0;
1194     }
1195
1196     if (e < 100) { // avoid really narrow windows
1197         return d->lastTransferRate;
1198     }
1199
1200     unsigned int ratio = (d->bytesTransferred * 1000) / e;
1201     // run a low-pass filter
1202     unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1203     smoothed /= 400;
1204
1205     d->timeTransferSample.stamp();
1206     d->bytesTransferred = 0;
1207     d->lastTransferRate = smoothed;
1208     return smoothed;
1209 }
1210
1211 uint64_t Client::totalBytesDownloaded() const
1212 {
1213     return d->totalBytesDownloaded;
1214 }
1215
1216 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1217 {
1218   size_t byteSize = size * nmemb;
1219   Request* req = static_cast<Request*>(userdata);
1220   req->processBodyBytes(ptr, byteSize);
1221
1222   Client* cl = req->http();
1223   if (cl) {
1224     cl->receivedBytes(byteSize);
1225   }
1226
1227   return byteSize;
1228 }
1229
1230 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1231 {
1232   size_t maxBytes = size * nmemb;
1233   Request* req = static_cast<Request*>(userdata);
1234   size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1235   return actualBytes;
1236 }
1237
1238 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1239 {
1240   size_t byteSize = size * nitems;
1241   Request* req = static_cast<Request*>(userdata);
1242   std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1243
1244   if (req->readyState() == HTTP::Request::OPENED) {
1245     req->responseStart(h);
1246     return byteSize;
1247   }
1248
1249   if (h.empty()) {
1250       // got a 100-continue reponse; restart
1251       if (req->responseCode() == 100) {
1252           req->setReadyState(HTTP::Request::OPENED);
1253           return byteSize;
1254       }
1255
1256     req->responseHeadersComplete();
1257     return byteSize;
1258   }
1259
1260   if (req->responseCode() == 100) {
1261       return byteSize; // skip headers associated with 100-continue status
1262   }
1263
1264   size_t colonPos = h.find(':');
1265   if (colonPos == std::string::npos) {
1266       SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1267       return byteSize;
1268   }
1269
1270   std::string key = strutils::simplify(h.substr(0, colonPos));
1271   std::string lkey = boost::to_lower_copy(key);
1272   std::string value = strutils::strip(h.substr(colonPos + 1));
1273
1274   req->responseHeader(lkey, value);
1275   return byteSize;
1276 }
1277
1278 void Client::debugDumpRequests()
1279 {
1280 #if defined(ENABLE_CURL)
1281     SG_LOG(SG_IO, SG_INFO, "== HTTP request dump");
1282     ClientPrivate::RequestCurlMap::iterator it = d->requests.begin();
1283     for (; it != d->requests.end(); ++it) {
1284         SG_LOG(SG_IO, SG_INFO, "\t" << it->first->url());
1285     }
1286     SG_LOG(SG_IO, SG_INFO, "==");
1287 #else
1288     SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
1289     ConnectionDict::iterator it = d->connections.begin();
1290     for (; it != d->connections.end(); ++it) {
1291         it->second->debugDumpRequests();
1292     }
1293     SG_LOG(SG_IO, SG_INFO, "==");
1294 #endif
1295 }
1296
1297 void Client::clearAllConnections()
1298 {
1299 #if defined(ENABLE_CURL)
1300     curl_multi_cleanup(d->curlMulti);
1301     d->createCurlMulti();
1302 #else
1303     ConnectionDict::iterator it = d->connections.begin();
1304     for (; it != d->connections.end(); ++it) {
1305         delete it->second;
1306     }
1307     d->connections.clear();
1308 #endif
1309 }
1310
1311 } // of namespace HTTP
1312
1313 } // of namespace simgear