]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
5ec7386b724aaf232fb59dd2d89adac48b4f522a
[simgear.git] / simgear / io / HTTPClient.cxx
1 /**
2  * \file HTTPClient.cxx - simple HTTP client engine for SimHear
3  */
4
5 // Written by James Turner
6 //
7 // Copyright (C) 2013  James Turner  <zakalawe@mac.com>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 // Library General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
22 //
23
24
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
27
28 #include <sstream>
29 #include <cassert>
30 #include <cstdlib> // rand()
31 #include <list>
32 #include <errno.h>
33 #include <map>
34 #include <stdexcept>
35
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
38
39 #include <simgear/simgear_config.h>
40
41 #if defined(ENABLE_CURL)
42   #include <curl/multi.h>
43 #else
44     #include <simgear/io/HTTPContentDecode.hxx>
45 #endif
46
47 #include <simgear/io/sg_netChat.hxx>
48
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
54
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
56 #include "version.h"
57 #else
58 #  if !defined(SIMGEAR_VERSION)
59 #    define SIMGEAR_VERSION "simgear-development"
60 #  endif
61 #endif
62
63 namespace simgear
64 {
65
66 namespace HTTP
67 {
68
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
71
72 class Connection;
73 typedef std::multimap<std::string, Connection*> ConnectionDict;
74 typedef std::list<Request_ptr> RequestList;
75
76 class Client::ClientPrivate
77 {
78 public:
79 #if defined(ENABLE_CURL)
80     CURLM* curlMulti;
81     bool haveActiveRequests;
82
83     void createCurlMulti()
84     {
85         curlMulti = curl_multi_init();
86         // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
87         // we request HTTP 1.1 pipelining
88         curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
89         curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
90         curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
91                           (long) maxPipelineDepth);
92         curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
93                           (long) maxHostConnections);
94
95
96     }
97 #else
98     NetChannelPoller poller;
99 // connections by host (potentially more than one)
100     ConnectionDict connections;
101 #endif
102
103     std::string userAgent;
104     std::string proxy;
105     int proxyPort;
106     std::string proxyAuth;
107     unsigned int maxConnections;
108     unsigned int maxHostConnections;
109     unsigned int maxPipelineDepth;
110
111     RequestList pendingRequests;
112
113     SGTimeStamp timeTransferSample;
114     unsigned int bytesTransferred;
115     unsigned int lastTransferRate;
116     uint64_t totalBytesDownloaded;
117 };
118
119 #if !defined(ENABLE_CURL)
120 class Connection : public NetChat
121 {
122 public:
123     Connection(Client* pr, const std::string& conId) :
124         client(pr),
125         state(STATE_CLOSED),
126         port(DEFAULT_HTTP_PORT),
127         _connectionId(conId),
128         _maxPipelineLength(255)
129     {
130     }
131
132     virtual ~Connection()
133     {
134     }
135
136     virtual void handleBufferRead (NetBuffer& buffer)
137     {
138       if( !activeRequest || !activeRequest->isComplete() )
139         return NetChat::handleBufferRead(buffer);
140
141       // Request should be aborted (signaled by setting its state to complete).
142
143       // force the state to GETTING_BODY, to simplify logic in
144       // responseComplete and handleClose
145         setState(STATE_GETTING_BODY);
146       responseComplete();
147     }
148
149     void setServer(const std::string& h, short p)
150     {
151         host = h;
152         port = p;
153     }
154
155     void setMaxPipelineLength(unsigned int m)
156     {
157         _maxPipelineLength = m;
158     }
159
160     // socket-level errors
161     virtual void handleError(int error)
162     {
163         const char* errStr = strerror(error);
164         SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
165                << errStr << ")");
166
167         debugDumpRequests();
168
169         if (!activeRequest)
170         {
171         // connection level failure, eg name lookup or routing
172         // we won't have an active request yet, so let's fail all of the
173         // requests since we presume it's a systematic failure for
174         // the host in question
175             BOOST_FOREACH(Request_ptr req, sentRequests) {
176                 req->setFailure(error, errStr);
177             }
178
179             BOOST_FOREACH(Request_ptr req, queuedRequests) {
180                 req->setFailure(error, errStr);
181             }
182
183             sentRequests.clear();
184             queuedRequests.clear();
185         }
186
187         NetChat::handleError(error);
188         if (activeRequest) {
189             activeRequest->setFailure(error, errStr);
190             activeRequest = NULL;
191             _contentDecoder.reset();
192         }
193
194         setState(STATE_SOCKET_ERROR);
195     }
196
197     void handleTimeout()
198     {
199         handleError(ETIMEDOUT);
200     }
201
202     virtual void handleClose()
203     {
204         NetChat::handleClose();
205
206     // closing of the connection from the server side when getting the body,
207         bool canCloseState = (state == STATE_GETTING_BODY);
208         if (canCloseState && activeRequest) {
209             // check bodyTransferSize matches how much we actually transferred
210             if (bodyTransferSize > 0) {
211                 if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
212                     SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
213                            << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
214                 }
215             }
216
217         // force state here, so responseComplete can avoid closing the
218         // socket again
219             SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
220             setState(STATE_CLOSED);
221             responseComplete();
222         } else {
223             if (state == STATE_WAITING_FOR_RESPONSE) {
224                 SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
225                        << sentRequests.front()->url());
226                 assert(!sentRequests.empty());
227                 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
228                 // no active request, but don't restore the front sent one
229                 sentRequests.erase(sentRequests.begin());
230             }
231
232             if (activeRequest) {
233                 activeRequest->setFailure(500, "server closed connection");
234                 // remove the failed request from sentRequests, so it does
235                 // not get restored
236                 RequestList::iterator it = std::find(sentRequests.begin(),
237                     sentRequests.end(), activeRequest);
238                 if (it != sentRequests.end()) {
239                     sentRequests.erase(it);
240                 }
241                 activeRequest = NULL;
242                 _contentDecoder.reset();
243             }
244
245             setState(STATE_CLOSED);
246         }
247
248       if (sentRequests.empty()) {
249         return;
250       }
251
252     // restore sent requests to the queue, so they will be re-sent
253     // when the connection opens again
254       queuedRequests.insert(queuedRequests.begin(),
255                               sentRequests.begin(), sentRequests.end());
256       sentRequests.clear();
257     }
258
259     void queueRequest(const Request_ptr& r)
260     {
261         queuedRequests.push_back(r);
262         tryStartNextRequest();
263     }
264
265     void beginResponse()
266     {
267         assert(!sentRequests.empty());
268         assert(state == STATE_WAITING_FOR_RESPONSE);
269
270         activeRequest = sentRequests.front();
271         try {
272             SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
273             activeRequest->responseStart(buffer);
274         } catch (sg_exception& e) {
275             handleError(EIO);
276             return;
277         }
278
279       setState(STATE_GETTING_HEADERS);
280       buffer.clear();
281       if (activeRequest->responseCode() == 204) {
282         noMessageBody = true;
283       } else if (activeRequest->method() == "HEAD") {
284         noMessageBody = true;
285       } else {
286         noMessageBody = false;
287       }
288
289       bodyTransferSize = -1;
290       chunkedTransfer = false;
291       _contentDecoder.reset();
292     }
293
294     void tryStartNextRequest()
295     {
296       while( !queuedRequests.empty()
297           && queuedRequests.front()->isComplete() )
298         queuedRequests.pop_front();
299
300       if (queuedRequests.empty()) {
301         idleTime.stamp();
302         return;
303       }
304
305       if (sentRequests.size() >= _maxPipelineLength) {
306         return;
307       }
308
309       if (state == STATE_CLOSED) {
310           if (!connectToHost()) {
311               setState(STATE_SOCKET_ERROR);
312               return;
313           }
314
315           SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
316           setTerminator("\r\n");
317           setState(STATE_IDLE);
318       }
319
320       Request_ptr r = queuedRequests.front();
321       r->requestStart();
322
323       std::stringstream headerData;
324       std::string path = r->path();
325       assert(!path.empty());
326       std::string query = r->query();
327       std::string bodyData;
328
329       if (!client->proxyHost().empty()) {
330           path = r->scheme() + "://" + r->host() + r->path();
331       }
332
333       if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
334           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
335           bodyData = query.substr(1); // URL-encode, drop the leading '?'
336           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
337           headerData << "Content-Length:" << bodyData.size() << "\r\n";
338       } else {
339           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
340           if( r->hasBodyData() )
341           {
342             headerData << "Content-Length:" << r->bodyLength() << "\r\n";
343             headerData << "Content-Type:" << r->bodyType() << "\r\n";
344           }
345       }
346
347       headerData << "Host: " << r->hostAndPort() << "\r\n";
348       headerData << "User-Agent:" << client->userAgent() << "\r\n";
349       headerData << "Accept-Encoding: deflate, gzip\r\n";
350       if (!client->proxyAuth().empty()) {
351           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
352       }
353
354       BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
355           headerData << h.first << ": " << h.second << "\r\n";
356       }
357
358       headerData << "\r\n"; // final CRLF to terminate the headers
359       if (!bodyData.empty()) {
360           headerData << bodyData;
361       }
362
363       bool ok = push(headerData.str().c_str());
364       if (!ok) {
365           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
366           // we've over-stuffed the socket, give up for now, let things
367           // drain down before trying to start any more requests.
368           return;
369       }
370
371       if( r->hasBodyData() )
372         for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
373         {
374           char buf[4096];
375           size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
376           if( len )
377           {
378             if( !bufferSend(buf, len) )
379             {
380               SG_LOG(SG_IO,
381                      SG_WARN,
382                      "overflow the HTTP::Connection output buffer");
383               state = STATE_SOCKET_ERROR;
384               return;
385             }
386             body_bytes_sent += len;
387           }
388           else
389           {
390             SG_LOG(SG_IO,
391                    SG_WARN,
392                    "HTTP asynchronous request body generation is unsupported");
393             break;
394           }
395         }
396
397         SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
398       // successfully sent, remove from queue, and maybe send the next
399       queuedRequests.pop_front();
400       sentRequests.push_back(r);
401       if (state == STATE_IDLE) {
402           setState(STATE_WAITING_FOR_RESPONSE);
403       }
404
405       // pipelining, let's maybe send the next request right away
406       tryStartNextRequest();
407     }
408
409     virtual void collectIncomingData(const char* s, int n)
410     {
411         idleTime.stamp();
412         client->receivedBytes(static_cast<unsigned int>(n));
413
414         if(   (state == STATE_GETTING_BODY)
415            || (state == STATE_GETTING_CHUNKED_BYTES) )
416           _contentDecoder.receivedBytes(s, n);
417         else
418           buffer.append(s, n);
419     }
420
421     virtual void foundTerminator(void)
422     {
423         idleTime.stamp();
424         switch (state) {
425         case STATE_WAITING_FOR_RESPONSE:
426             beginResponse();
427             break;
428
429         case STATE_GETTING_HEADERS:
430             processHeader();
431             buffer.clear();
432             break;
433
434         case STATE_GETTING_BODY:
435             responseComplete();
436             break;
437
438         case STATE_GETTING_CHUNKED:
439             processChunkHeader();
440             break;
441
442         case STATE_GETTING_CHUNKED_BYTES:
443             setTerminator("\r\n");
444             setState(STATE_GETTING_CHUNKED);
445             buffer.clear();
446             break;
447
448
449         case STATE_GETTING_TRAILER:
450             processTrailer();
451             buffer.clear();
452             break;
453
454         case STATE_IDLE:
455             SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
456
457         default:
458             break;
459         }
460     }
461
462     bool hasIdleTimeout() const
463     {
464         if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
465             return false;
466         }
467
468         assert(sentRequests.empty());
469         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
470         return isTimedOut;
471     }
472
473     bool hasErrorTimeout() const
474     {
475       if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
476         return false;
477       }
478
479         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
480         return isTimedOut;
481     }
482
483     bool hasError() const
484     {
485         return (state == STATE_SOCKET_ERROR);
486     }
487
488     bool shouldStartNext() const
489     {
490       return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
491     }
492
493     bool isActive() const
494     {
495         return !queuedRequests.empty() || !sentRequests.empty();
496     }
497
498     std::string connectionId() const
499     {
500         return _connectionId;
501     }
502
503     void debugDumpRequests() const
504     {
505         SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
506                << "; state=" << state << ")");
507         if (activeRequest) {
508             SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
509         } else {
510             SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
511         }
512
513         BOOST_FOREACH(Request_ptr req, sentRequests) {
514             SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
515         }
516
517         BOOST_FOREACH(Request_ptr req, queuedRequests) {
518             SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
519         }
520     }
521 private:
522     enum ConnectionState {
523         STATE_IDLE = 0,
524         STATE_WAITING_FOR_RESPONSE,
525         STATE_GETTING_HEADERS,
526         STATE_GETTING_BODY,
527         STATE_GETTING_CHUNKED,
528         STATE_GETTING_CHUNKED_BYTES,
529         STATE_GETTING_TRAILER,
530         STATE_SOCKET_ERROR,
531         STATE_CLOSED             ///< connection should be closed now
532     };
533
534     void setState(ConnectionState newState)
535     {
536         if (state == newState) {
537             return;
538         }
539
540         state = newState;
541     }
542
543     bool connectToHost()
544     {
545         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
546
547         if (!open()) {
548             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
549             return false;
550         }
551
552         if (connect(host.c_str(), port) != 0) {
553             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
554             return false;
555         }
556
557         return true;
558     }
559
560
561     void processHeader()
562     {
563         std::string h = strutils::simplify(buffer);
564         if (h.empty()) { // blank line terminates headers
565             headersComplete();
566             return;
567         }
568
569         int colonPos = buffer.find(':');
570         if (colonPos < 0) {
571             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
572             return;
573         }
574
575         std::string key = strutils::simplify(buffer.substr(0, colonPos));
576         std::string lkey = boost::to_lower_copy(key);
577         std::string value = strutils::strip(buffer.substr(colonPos + 1));
578
579         // only consider these if getting headers (as opposed to trailers
580         // of a chunked transfer)
581         if (state == STATE_GETTING_HEADERS) {
582             if (lkey == "content-length") {
583
584                 int sz = strutils::to_int(value);
585                 if (bodyTransferSize <= 0) {
586                     bodyTransferSize = sz;
587                 }
588                 activeRequest->setResponseLength(sz);
589             } else if (lkey == "transfer-length") {
590                 bodyTransferSize = strutils::to_int(value);
591             } else if (lkey == "transfer-encoding") {
592                 processTransferEncoding(value);
593             } else if (lkey == "content-encoding") {
594                 _contentDecoder.setEncoding(value);
595             }
596         }
597
598         activeRequest->responseHeader(lkey, value);
599     }
600
601     void processTransferEncoding(const std::string& te)
602     {
603         if (te == "chunked") {
604             chunkedTransfer = true;
605         } else {
606             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
607             // failure
608         }
609     }
610
611     void processChunkHeader()
612     {
613         if (buffer.empty()) {
614             // blank line after chunk data
615             return;
616         }
617
618         int chunkSize = 0;
619         int semiPos = buffer.find(';');
620         if (semiPos >= 0) {
621             // extensions ignored for the moment
622             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
623         } else {
624             chunkSize = strutils::to_int(buffer, 16);
625         }
626
627         buffer.clear();
628         if (chunkSize == 0) {  //  trailer start
629             setState(STATE_GETTING_TRAILER);
630             return;
631         }
632
633         setState(STATE_GETTING_CHUNKED_BYTES);
634         setByteCount(chunkSize);
635     }
636
637     void processTrailer()
638     {
639         if (buffer.empty()) {
640             // end of trailers
641             responseComplete();
642             return;
643         }
644
645     // process as a normal header
646         processHeader();
647     }
648
649     void headersComplete()
650     {
651         activeRequest->responseHeadersComplete();
652         _contentDecoder.initWithRequest(activeRequest);
653
654         if (!activeRequest->serverSupportsPipelining()) {
655             SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it");
656             _maxPipelineLength = 1;
657         }
658
659         if (chunkedTransfer) {
660             setState(STATE_GETTING_CHUNKED);
661         } else if (noMessageBody || (bodyTransferSize == 0)) {
662             // force the state to GETTING_BODY, to simplify logic in
663             // responseComplete and handleClose
664             setState(STATE_GETTING_BODY);
665             responseComplete();
666         } else {
667             setByteCount(bodyTransferSize); // may be -1, that's fine
668             setState(STATE_GETTING_BODY);
669         }
670     }
671
672     void responseComplete()
673     {
674         Request_ptr completedRequest = activeRequest;
675         _contentDecoder.finish();
676
677         assert(sentRequests.front() == activeRequest);
678         sentRequests.pop_front();
679         bool doClose = activeRequest->closeAfterComplete();
680         activeRequest = NULL;
681
682         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
683             if (doClose) {
684                 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
685           // this will bring us into handleClose() above, which updates
686           // state to STATE_CLOSED
687               close();
688
689           // if we have additional requests waiting, try to start them now
690               tryStartNextRequest();
691             }
692         }
693
694         if (state != STATE_CLOSED)  {
695             setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
696         }
697
698     // notify request after we change state, so this connection is idle
699     // if completion triggers other requests (which is likely)
700         completedRequest->responseComplete();
701         client->requestFinished(this);
702
703         setTerminator("\r\n");
704     }
705
706     Client* client;
707     Request_ptr activeRequest;
708     ConnectionState state;
709     std::string host;
710     short port;
711     std::string buffer;
712     int bodyTransferSize;
713     SGTimeStamp idleTime;
714     bool chunkedTransfer;
715     bool noMessageBody;
716
717     RequestList queuedRequests;
718     RequestList sentRequests;
719
720     ContentDecoder _contentDecoder;
721     std::string _connectionId;
722     unsigned int _maxPipelineLength;
723 };
724 #endif // of !ENABLE_CURL
725
726 Client::Client() :
727     d(new ClientPrivate)
728 {
729     d->proxyPort = 0;
730     d->maxConnections = 4;
731     d->maxHostConnections = 4;
732     d->bytesTransferred = 0;
733     d->lastTransferRate = 0;
734     d->timeTransferSample.stamp();
735     d->totalBytesDownloaded = 0;
736     d->maxPipelineDepth = 5;
737     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
738 #if defined(ENABLE_CURL)
739     static bool didInitCurlGlobal = false;
740     if (!didInitCurlGlobal) {
741       curl_global_init(CURL_GLOBAL_ALL);
742       didInitCurlGlobal = true;
743     }
744
745     d->createCurlMulti();
746 #endif
747 }
748
749 Client::~Client()
750 {
751 #if defined(ENABLE_CURL)
752   curl_multi_cleanup(d->curlMulti);
753 #endif
754 }
755
756 void Client::setMaxConnections(unsigned int maxCon)
757 {
758     d->maxConnections = maxCon;
759 #if defined(ENABLE_CURL)
760     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
761 #endif
762 }
763
764 void Client::setMaxHostConnections(unsigned int maxHostCon)
765 {
766     d->maxHostConnections = maxHostCon;
767 #if defined(ENABLE_CURL)
768     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
769 #endif
770 }
771
772 void Client::setMaxPipelineDepth(unsigned int depth)
773 {
774     d->maxPipelineDepth = depth;
775 #if defined(ENABLE_CURL)
776     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
777 #else
778     ConnectionDict::iterator it = d->connections.begin();
779     for (; it != d->connections.end(); ) {
780         it->second->setMaxPipelineLength(depth);
781     }
782 #endif
783 }
784
785 void Client::update(int waitTimeout)
786 {
787 #if defined(ENABLE_CURL)
788     int remainingActive, messagesInQueue;
789     curl_multi_perform(d->curlMulti, &remainingActive);
790     d->haveActiveRequests = (remainingActive > 0);
791
792     CURLMsg* msg;
793     while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
794       if (msg->msg == CURLMSG_DONE) {
795         Request* req;
796         CURL *e = msg->easy_handle;
797         curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
798
799         long responseCode;
800         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
801
802         if (msg->data.result == 0) {
803           req->responseComplete();
804         } else {
805           fprintf(stderr, "Result: %d - %s\n",
806                 msg->data.result, curl_easy_strerror(msg->data.result));
807           req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
808         }
809
810         curl_multi_remove_handle(d->curlMulti, e);
811
812         // balance the reference we take in makeRequest
813         SGReferenced::put(req);
814         curl_easy_cleanup(e);
815       }
816       else {
817         SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
818       }
819     } // of curl message processing loop
820 #else
821     if (!d->poller.hasChannels() && (waitTimeout > 0)) {
822         SGTimeStamp::sleepForMSec(waitTimeout);
823     } else {
824         d->poller.poll(waitTimeout);
825     }
826
827     bool waitingRequests = !d->pendingRequests.empty();
828     ConnectionDict::iterator it = d->connections.begin();
829     for (; it != d->connections.end(); ) {
830         Connection* con = it->second;
831         if (con->hasIdleTimeout() ||
832             con->hasError() ||
833             con->hasErrorTimeout() ||
834             (!con->isActive() && waitingRequests))
835         {
836             if (con->hasErrorTimeout()) {
837                 // tell the connection we're timing it out
838                 con->handleTimeout();
839             }
840
841         // connection has been idle for a while, clean it up
842         // (or if we have requests waiting for a different host,
843         // or an error condition
844             ConnectionDict::iterator del = it++;
845             delete del->second;
846             d->connections.erase(del);
847         } else {
848             if (it->second->shouldStartNext()) {
849                 it->second->tryStartNextRequest();
850             }
851             ++it;
852         }
853     } // of connection iteration
854
855     if (waitingRequests && (d->connections.size() < d->maxConnections)) {
856         RequestList waiting(d->pendingRequests);
857         d->pendingRequests.clear();
858
859         // re-submit all waiting requests in order; this takes care of
860         // finding multiple pending items targetted to the same (new)
861         // connection
862         BOOST_FOREACH(Request_ptr req, waiting) {
863             makeRequest(req);
864         }
865     }
866 #endif
867 }
868
869 void Client::makeRequest(const Request_ptr& r)
870 {
871     if( r->isComplete() )
872       return;
873
874     if( r->url().find("://") == std::string::npos ) {
875         r->setFailure(EINVAL, "malformed URL");
876         return;
877     }
878
879     r->_client = this;
880
881 #if defined(ENABLE_CURL)
882     CURL* curlRequest = curl_easy_init();
883     curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
884
885     // manually increase the ref count of the request
886     SGReferenced::get(r.get());
887     curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
888     // disable built-in libCurl progress feedback
889     curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
890
891     curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
892     curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
893     curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
894     curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
895
896     curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
897     curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
898
899     if (!d->proxy.empty()) {
900       curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
901       curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
902
903       if (!d->proxyAuth.empty()) {
904         curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
905         curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
906       }
907     }
908
909     std::string method = boost::to_lower_copy(r->method());
910     if (method == "get") {
911       curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
912     } else if (method == "put") {
913       curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
914       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
915     } else if (method == "post") {
916       // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
917       curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
918
919       std::string q = r->query().substr(1);
920       curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
921
922       // reset URL to exclude query pieces
923       std::string urlWithoutQuery = r->url();
924       std::string::size_type queryPos = urlWithoutQuery.find('?');
925       urlWithoutQuery.resize(queryPos);
926       curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
927     } else {
928       curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
929     }
930
931     struct curl_slist* headerList = NULL;
932     if (r->hasBodyData() && (method != "post")) {
933       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
934       curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
935       curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
936       curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
937       std::string h = "Content-Type:" + r->bodyType();
938       headerList = curl_slist_append(headerList, h.c_str());
939     }
940
941     StringMap::const_iterator it;
942     for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
943       std::string h = it->first + ": " + it->second;
944       headerList = curl_slist_append(headerList, h.c_str());
945     }
946
947     if (headerList != NULL) {
948       curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
949     }
950
951     curl_multi_add_handle(d->curlMulti, curlRequest);
952     d->haveActiveRequests = true;
953
954 // FIXME - premature?
955     r->requestStart();
956
957 #else
958     if( r->url().find("http://") != 0 ) {
959         r->setFailure(EINVAL, "only HTTP protocol is supported");
960         return;
961     }
962
963     std::string host = r->host();
964     int port = r->port();
965     if (!d->proxy.empty()) {
966         host = d->proxy;
967         port = d->proxyPort;
968     }
969
970     Connection* con = NULL;
971     std::stringstream ss;
972     ss << host << "-" << port;
973     std::string connectionId = ss.str();
974     bool havePending = !d->pendingRequests.empty();
975     bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
976     ConnectionDict::iterator consEnd = d->connections.end();
977
978     // assign request to an existing Connection.
979     // various options exist here, examined in order
980     ConnectionDict::iterator it = d->connections.find(connectionId);
981     if (atConnectionsLimit && (it == consEnd)) {
982         // maximum number of connections active, queue this request
983         // when a connection goes inactive, we'll start this one
984         d->pendingRequests.push_back(r);
985         return;
986     }
987
988     // scan for an idle Connection to the same host (likely if we're
989     // retrieving multiple resources from the same host in quick succession)
990     // if we have pending requests (waiting for a free Connection), then
991     // force new requests on this id to always use the first Connection
992     // (instead of the random selection below). This ensures that when
993     // there's pressure on the number of connections to keep alive, one
994     // host can't DoS every other.
995     int count = 0;
996     for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
997         if (havePending || !it->second->isActive()) {
998             con = it->second;
999             break;
1000         }
1001     }
1002
1003     bool atHostConnectionsLimit = (count >= d->maxHostConnections);
1004
1005     if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
1006         // all current connections are busy (active), and we don't
1007         // have free connections to allocate, so let's assign to
1008         // an existing one randomly. Ideally we'd used whichever one will
1009         // complete first but we don't have that info.
1010         int index = rand() % count;
1011         for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
1012         con = it->second;
1013     }
1014
1015     // allocate a new connection object
1016     if (!con) {
1017         static int connectionSuffx = 0;
1018
1019         std::stringstream ss;
1020         ss << connectionId << "-" << connectionSuffx++;
1021
1022         SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
1023         con = new Connection(this, ss.str());
1024         con->setServer(host, port);
1025         con->setMaxPipelineLength(d->maxPipelineDepth);
1026         d->poller.addChannel(con);
1027         d->connections.insert(d->connections.end(),
1028             ConnectionDict::value_type(connectionId, con));
1029     }
1030
1031     SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
1032     con->queueRequest(r);
1033 #endif
1034 }
1035
1036 //------------------------------------------------------------------------------
1037 FileRequestRef Client::save( const std::string& url,
1038                              const std::string& filename )
1039 {
1040   FileRequestRef req = new FileRequest(url, filename);
1041   makeRequest(req);
1042   return req;
1043 }
1044
1045 //------------------------------------------------------------------------------
1046 MemoryRequestRef Client::load(const std::string& url)
1047 {
1048   MemoryRequestRef req = new MemoryRequest(url);
1049   makeRequest(req);
1050   return req;
1051 }
1052
1053 void Client::requestFinished(Connection* con)
1054 {
1055
1056 }
1057
1058 void Client::setUserAgent(const std::string& ua)
1059 {
1060     d->userAgent = ua;
1061 }
1062
1063 const std::string& Client::userAgent() const
1064 {
1065     return d->userAgent;
1066 }
1067
1068 const std::string& Client::proxyHost() const
1069 {
1070     return d->proxy;
1071 }
1072
1073 const std::string& Client::proxyAuth() const
1074 {
1075     return d->proxyAuth;
1076 }
1077
1078 void Client::setProxy( const std::string& proxy,
1079                        int port,
1080                        const std::string& auth )
1081 {
1082     d->proxy = proxy;
1083     d->proxyPort = port;
1084     d->proxyAuth = auth;
1085 }
1086
1087 bool Client::hasActiveRequests() const
1088 {
1089   #if defined(ENABLE_CURL)
1090     return d->haveActiveRequests;
1091   #else
1092     ConnectionDict::const_iterator it = d->connections.begin();
1093     for (; it != d->connections.end(); ++it) {
1094         if (it->second->isActive()) return true;
1095     }
1096
1097     return false;
1098 #endif
1099 }
1100
1101 void Client::receivedBytes(unsigned int count)
1102 {
1103     d->bytesTransferred += count;
1104     d->totalBytesDownloaded += count;
1105 }
1106
1107 unsigned int Client::transferRateBytesPerSec() const
1108 {
1109     unsigned int e = d->timeTransferSample.elapsedMSec();
1110     if (e > 400) {
1111         // too long a window, ignore
1112         d->timeTransferSample.stamp();
1113         d->bytesTransferred = 0;
1114         d->lastTransferRate = 0;
1115         return 0;
1116     }
1117
1118     if (e < 100) { // avoid really narrow windows
1119         return d->lastTransferRate;
1120     }
1121
1122     unsigned int ratio = (d->bytesTransferred * 1000) / e;
1123     // run a low-pass filter
1124     unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1125     smoothed /= 400;
1126
1127     d->timeTransferSample.stamp();
1128     d->bytesTransferred = 0;
1129     d->lastTransferRate = smoothed;
1130     return smoothed;
1131 }
1132
1133 uint64_t Client::totalBytesDownloaded() const
1134 {
1135     return d->totalBytesDownloaded;
1136 }
1137
1138 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1139 {
1140   size_t byteSize = size * nmemb;
1141   Request* req = static_cast<Request*>(userdata);
1142   req->processBodyBytes(ptr, byteSize);
1143
1144   Client* cl = req->http();
1145   if (cl) {
1146     cl->receivedBytes(byteSize);
1147   }
1148
1149   return byteSize;
1150 }
1151
1152 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1153 {
1154   size_t maxBytes = size * nmemb;
1155   Request* req = static_cast<Request*>(userdata);
1156   size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1157   return actualBytes;
1158 }
1159
1160 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1161 {
1162   size_t byteSize = size * nitems;
1163   Request* req = static_cast<Request*>(userdata);
1164   std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1165
1166   if (req->readyState() == HTTP::Request::OPENED) {
1167     req->responseStart(h);
1168     return byteSize;
1169   }
1170
1171   if (h.empty()) {
1172       // got a 100-continue reponse; restart
1173       if (req->responseCode() == 100) {
1174           req->setReadyState(HTTP::Request::OPENED);
1175           return byteSize;
1176       }
1177
1178     req->responseHeadersComplete();
1179     return byteSize;
1180   }
1181
1182   if (req->responseCode() == 100) {
1183       return byteSize; // skip headers associated with 100-continue status
1184   }
1185
1186   size_t colonPos = h.find(':');
1187   if (colonPos == std::string::npos) {
1188       SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1189       return byteSize;
1190   }
1191
1192   std::string key = strutils::simplify(h.substr(0, colonPos));
1193   std::string lkey = boost::to_lower_copy(key);
1194   std::string value = strutils::strip(h.substr(colonPos + 1));
1195
1196   req->responseHeader(lkey, value);
1197   return byteSize;
1198 }
1199
1200 void Client::debugDumpRequests()
1201 {
1202 #if defined(ENABLE_CURL)
1203
1204 #else
1205     SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
1206     ConnectionDict::iterator it = d->connections.begin();
1207     for (; it != d->connections.end(); ++it) {
1208         it->second->debugDumpRequests();
1209     }
1210     SG_LOG(SG_IO, SG_INFO, "==");
1211 #endif
1212 }
1213
1214 void Client::clearAllConnections()
1215 {
1216 #if defined(ENABLE_CURL)
1217     curl_multi_cleanup(d->curlMulti);
1218     d->createCurlMulti();
1219 #else
1220     ConnectionDict::iterator it = d->connections.begin();
1221     for (; it != d->connections.end(); ++it) {
1222         delete it->second;
1223     }
1224     d->connections.clear();
1225 #endif
1226 }
1227
1228 } // of namespace HTTP
1229
1230 } // of namespace simgear