]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
d42519d8b9af017c4b1f86da1ce46c934d490116
[simgear.git] / simgear / io / HTTPClient.cxx
1 /**
2  * \file HTTPClient.cxx - simple HTTP client engine for SimHear
3  */
4
5 // Written by James Turner
6 //
7 // Copyright (C) 2013  James Turner  <zakalawe@mac.com>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 // Library General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
22 //
23
24
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
27
28 #include <sstream>
29 #include <cassert>
30 #include <cstdlib> // rand()
31 #include <list>
32 #include <errno.h>
33 #include <map>
34 #include <stdexcept>
35
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
38
39 #include <simgear/simgear_config.h>
40
41 #if defined(ENABLE_CURL)
42   #include <curl/multi.h>
43 #else
44     #include <simgear/io/HTTPContentDecode.hxx>
45 #endif
46
47 #include <simgear/io/sg_netChat.hxx>
48
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
54
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
56 #include "version.h"
57 #else
58 #  if !defined(SIMGEAR_VERSION)
59 #    define SIMGEAR_VERSION "simgear-development"
60 #  endif
61 #endif
62
63 namespace simgear
64 {
65
66 namespace HTTP
67 {
68
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
71
72 class Connection;
73 typedef std::multimap<std::string, Connection*> ConnectionDict;
74 typedef std::list<Request_ptr> RequestList;
75
76 class Client::ClientPrivate
77 {
78 public:
79 #if defined(ENABLE_CURL)
80     CURLM* curlMulti;
81     bool haveActiveRequests;
82
83     void createCurlMulti()
84     {
85         curlMulti = curl_multi_init();
86         // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
87         // we request HTTP 1.1 pipelining
88         curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
89         curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
90         curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
91                           (long) maxPipelineDepth);
92         curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
93                           (long) maxHostConnections);
94
95
96     }
97 #else
98     NetChannelPoller poller;
99 // connections by host (potentially more than one)
100     ConnectionDict connections;
101 #endif
102
103     std::string userAgent;
104     std::string proxy;
105     int proxyPort;
106     std::string proxyAuth;
107     unsigned int maxConnections;
108     unsigned int maxHostConnections;
109     unsigned int maxPipelineDepth;
110
111     RequestList pendingRequests;
112
113     SGTimeStamp timeTransferSample;
114     unsigned int bytesTransferred;
115     unsigned int lastTransferRate;
116     uint64_t totalBytesDownloaded;
117 };
118
119 #if !defined(ENABLE_CURL)
120 class Connection : public NetChat
121 {
122 public:
123     Connection(Client* pr, const std::string& conId) :
124         client(pr),
125         state(STATE_CLOSED),
126         port(DEFAULT_HTTP_PORT),
127         _connectionId(conId),
128         _maxPipelineLength(255)
129     {
130     }
131
132     virtual ~Connection()
133     {
134     }
135
136     virtual void handleBufferRead (NetBuffer& buffer)
137     {
138       if( !activeRequest || !activeRequest->isComplete() )
139         return NetChat::handleBufferRead(buffer);
140
141       // Request should be aborted (signaled by setting its state to complete).
142
143       // force the state to GETTING_BODY, to simplify logic in
144       // responseComplete and handleClose
145         setState(STATE_GETTING_BODY);
146       responseComplete();
147     }
148
149     void setServer(const std::string& h, short p)
150     {
151         host = h;
152         port = p;
153     }
154
155     void setMaxPipelineLength(unsigned int m)
156     {
157         _maxPipelineLength = m;
158     }
159
160     // socket-level errors
161     virtual void handleError(int error)
162     {
163         const char* errStr = strerror(error);
164         SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
165                << errStr << ")");
166
167         debugDumpRequests();
168
169         if (!activeRequest)
170         {
171         // connection level failure, eg name lookup or routing
172         // we won't have an active request yet, so let's fail all of the
173         // requests since we presume it's a systematic failure for
174         // the host in question
175             BOOST_FOREACH(Request_ptr req, sentRequests) {
176                 req->setFailure(error, errStr);
177             }
178
179             BOOST_FOREACH(Request_ptr req, queuedRequests) {
180                 req->setFailure(error, errStr);
181             }
182
183             sentRequests.clear();
184             queuedRequests.clear();
185         }
186
187         NetChat::handleError(error);
188         if (activeRequest) {
189             activeRequest->setFailure(error, errStr);
190             activeRequest = NULL;
191             _contentDecoder.reset();
192         }
193
194         setState(STATE_SOCKET_ERROR);
195     }
196
197     void handleTimeout()
198     {
199         handleError(ETIMEDOUT);
200     }
201
202     virtual void handleClose()
203     {
204         NetChat::handleClose();
205
206     // closing of the connection from the server side when getting the body,
207         bool canCloseState = (state == STATE_GETTING_BODY);
208         if (canCloseState && activeRequest) {
209             // check bodyTransferSize matches how much we actually transferred
210             if (bodyTransferSize > 0) {
211                 if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
212                     SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
213                            << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
214                 }
215             }
216
217         // force state here, so responseComplete can avoid closing the
218         // socket again
219             SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
220             setState(STATE_CLOSED);
221             responseComplete();
222         } else {
223             if (state == STATE_WAITING_FOR_RESPONSE) {
224                 SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
225                        << sentRequests.front()->url());
226                 assert(!sentRequests.empty());
227                 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
228                 // no active request, but don't restore the front sent one
229                 sentRequests.erase(sentRequests.begin());
230             }
231
232             if (activeRequest) {
233                 activeRequest->setFailure(500, "server closed connection");
234                 // remove the failed request from sentRequests, so it does
235                 // not get restored
236                 RequestList::iterator it = std::find(sentRequests.begin(),
237                     sentRequests.end(), activeRequest);
238                 if (it != sentRequests.end()) {
239                     sentRequests.erase(it);
240                 }
241                 activeRequest = NULL;
242                 _contentDecoder.reset();
243             }
244
245             setState(STATE_CLOSED);
246         }
247
248       if (sentRequests.empty()) {
249         return;
250       }
251
252     // restore sent requests to the queue, so they will be re-sent
253     // when the connection opens again
254       queuedRequests.insert(queuedRequests.begin(),
255                               sentRequests.begin(), sentRequests.end());
256       sentRequests.clear();
257     }
258
259     void queueRequest(const Request_ptr& r)
260     {
261         queuedRequests.push_back(r);
262         tryStartNextRequest();
263     }
264
265     void beginResponse()
266     {
267         assert(!sentRequests.empty());
268         assert(state == STATE_WAITING_FOR_RESPONSE);
269
270         activeRequest = sentRequests.front();
271         try {
272             SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
273             activeRequest->responseStart(buffer);
274         } catch (sg_exception& e) {
275             handleError(EIO);
276             return;
277         }
278
279       setState(STATE_GETTING_HEADERS);
280       buffer.clear();
281       if (activeRequest->responseCode() == 204) {
282         noMessageBody = true;
283       } else if (activeRequest->method() == "HEAD") {
284         noMessageBody = true;
285       } else {
286         noMessageBody = false;
287       }
288
289       bodyTransferSize = -1;
290       chunkedTransfer = false;
291       _contentDecoder.reset();
292     }
293
294     void tryStartNextRequest()
295     {
296       while( !queuedRequests.empty()
297           && queuedRequests.front()->isComplete() )
298         queuedRequests.pop_front();
299
300       if (queuedRequests.empty()) {
301         idleTime.stamp();
302         return;
303       }
304
305       if (sentRequests.size() >= _maxPipelineLength) {
306         return;
307       }
308
309       if (state == STATE_CLOSED) {
310           if (!connectToHost()) {
311               setState(STATE_SOCKET_ERROR);
312               return;
313           }
314
315           SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
316           setTerminator("\r\n");
317           setState(STATE_IDLE);
318       }
319
320       Request_ptr r = queuedRequests.front();
321       r->requestStart();
322
323       std::stringstream headerData;
324       std::string path = r->path();
325       assert(!path.empty());
326       std::string query = r->query();
327       std::string bodyData;
328
329       if (!client->proxyHost().empty()) {
330           path = r->scheme() + "://" + r->host() + r->path();
331       }
332
333       if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
334           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
335           bodyData = query.substr(1); // URL-encode, drop the leading '?'
336           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
337           headerData << "Content-Length:" << bodyData.size() << "\r\n";
338       } else {
339           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
340           if( r->hasBodyData() )
341           {
342             headerData << "Content-Length:" << r->bodyLength() << "\r\n";
343             headerData << "Content-Type:" << r->bodyType() << "\r\n";
344           }
345       }
346
347       headerData << "Host: " << r->hostAndPort() << "\r\n";
348       headerData << "User-Agent:" << client->userAgent() << "\r\n";
349       headerData << "Accept-Encoding: deflate, gzip\r\n";
350       if (!client->proxyAuth().empty()) {
351           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
352       }
353
354       BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
355           headerData << h.first << ": " << h.second << "\r\n";
356       }
357
358       headerData << "\r\n"; // final CRLF to terminate the headers
359       if (!bodyData.empty()) {
360           headerData << bodyData;
361       }
362
363       bool ok = push(headerData.str().c_str());
364       if (!ok) {
365           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
366           // we've over-stuffed the socket, give up for now, let things
367           // drain down before trying to start any more requests.
368           return;
369       }
370
371       if( r->hasBodyData() )
372         for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
373         {
374           char buf[4096];
375           size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
376           if( len )
377           {
378             if( !bufferSend(buf, len) )
379             {
380               SG_LOG(SG_IO,
381                      SG_WARN,
382                      "overflow the HTTP::Connection output buffer");
383               state = STATE_SOCKET_ERROR;
384               return;
385             }
386             body_bytes_sent += len;
387           }
388           else
389           {
390             SG_LOG(SG_IO,
391                    SG_WARN,
392                    "HTTP asynchronous request body generation is unsupported");
393             break;
394           }
395         }
396
397         SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
398       // successfully sent, remove from queue, and maybe send the next
399       queuedRequests.pop_front();
400       sentRequests.push_back(r);
401       if (state == STATE_IDLE) {
402           setState(STATE_WAITING_FOR_RESPONSE);
403       }
404
405       // pipelining, let's maybe send the next request right away
406       tryStartNextRequest();
407     }
408
409     virtual void collectIncomingData(const char* s, int n)
410     {
411         idleTime.stamp();
412         client->receivedBytes(static_cast<unsigned int>(n));
413
414         if(   (state == STATE_GETTING_BODY)
415            || (state == STATE_GETTING_CHUNKED_BYTES) )
416           _contentDecoder.receivedBytes(s, n);
417         else
418           buffer.append(s, n);
419     }
420
421     virtual void foundTerminator(void)
422     {
423         idleTime.stamp();
424         switch (state) {
425         case STATE_WAITING_FOR_RESPONSE:
426             beginResponse();
427             break;
428
429         case STATE_GETTING_HEADERS:
430             processHeader();
431             buffer.clear();
432             break;
433
434         case STATE_GETTING_BODY:
435             responseComplete();
436             break;
437
438         case STATE_GETTING_CHUNKED:
439             processChunkHeader();
440             break;
441
442         case STATE_GETTING_CHUNKED_BYTES:
443             setTerminator("\r\n");
444             setState(STATE_GETTING_CHUNKED);
445             buffer.clear();
446             break;
447
448
449         case STATE_GETTING_TRAILER:
450             processTrailer();
451             buffer.clear();
452             break;
453
454         case STATE_IDLE:
455             SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
456
457         default:
458             break;
459         }
460     }
461
462     bool hasIdleTimeout() const
463     {
464         if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
465             return false;
466         }
467
468         assert(sentRequests.empty());
469         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
470         return isTimedOut;
471     }
472
473     bool hasErrorTimeout() const
474     {
475       if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
476         return false;
477       }
478
479         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
480         return isTimedOut;
481     }
482
483     bool hasError() const
484     {
485         return (state == STATE_SOCKET_ERROR);
486     }
487
488     bool shouldStartNext() const
489     {
490       return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
491     }
492
493     bool isActive() const
494     {
495         return !queuedRequests.empty() || !sentRequests.empty();
496     }
497
498     std::string connectionId() const
499     {
500         return _connectionId;
501     }
502
503     void debugDumpRequests() const
504     {
505         SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
506                << "; state=" << state << ")");
507         if (activeRequest) {
508             SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
509         } else {
510             SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
511         }
512
513         BOOST_FOREACH(Request_ptr req, sentRequests) {
514             SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
515         }
516
517         BOOST_FOREACH(Request_ptr req, queuedRequests) {
518             SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
519         }
520     }
521 private:
522     enum ConnectionState {
523         STATE_IDLE = 0,
524         STATE_WAITING_FOR_RESPONSE,
525         STATE_GETTING_HEADERS,
526         STATE_GETTING_BODY,
527         STATE_GETTING_CHUNKED,
528         STATE_GETTING_CHUNKED_BYTES,
529         STATE_GETTING_TRAILER,
530         STATE_SOCKET_ERROR,
531         STATE_CLOSED             ///< connection should be closed now
532     };
533
534     void setState(ConnectionState newState)
535     {
536         if (state == newState) {
537             return;
538         }
539
540         state = newState;
541     }
542
543     bool connectToHost()
544     {
545         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
546
547         if (!open()) {
548             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
549             return false;
550         }
551
552         if (connect(host.c_str(), port) != 0) {
553             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
554             return false;
555         }
556
557         return true;
558     }
559
560
561     void processHeader()
562     {
563         std::string h = strutils::simplify(buffer);
564         if (h.empty()) { // blank line terminates headers
565             headersComplete();
566             return;
567         }
568
569         int colonPos = buffer.find(':');
570         if (colonPos < 0) {
571             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
572             return;
573         }
574
575         std::string key = strutils::simplify(buffer.substr(0, colonPos));
576         std::string lkey = boost::to_lower_copy(key);
577         std::string value = strutils::strip(buffer.substr(colonPos + 1));
578
579         // only consider these if getting headers (as opposed to trailers
580         // of a chunked transfer)
581         if (state == STATE_GETTING_HEADERS) {
582             if (lkey == "content-length") {
583
584                 int sz = strutils::to_int(value);
585                 if (bodyTransferSize <= 0) {
586                     bodyTransferSize = sz;
587                 }
588                 activeRequest->setResponseLength(sz);
589             } else if (lkey == "transfer-length") {
590                 bodyTransferSize = strutils::to_int(value);
591             } else if (lkey == "transfer-encoding") {
592                 processTransferEncoding(value);
593             } else if (lkey == "content-encoding") {
594                 _contentDecoder.setEncoding(value);
595             }
596         }
597
598         activeRequest->responseHeader(lkey, value);
599     }
600
601     void processTransferEncoding(const std::string& te)
602     {
603         if (te == "chunked") {
604             chunkedTransfer = true;
605         } else {
606             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
607             // failure
608         }
609     }
610
611     void processChunkHeader()
612     {
613         if (buffer.empty()) {
614             // blank line after chunk data
615             return;
616         }
617
618         int chunkSize = 0;
619         int semiPos = buffer.find(';');
620         if (semiPos >= 0) {
621             // extensions ignored for the moment
622             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
623         } else {
624             chunkSize = strutils::to_int(buffer, 16);
625         }
626
627         buffer.clear();
628         if (chunkSize == 0) {  //  trailer start
629             setState(STATE_GETTING_TRAILER);
630             return;
631         }
632
633         setState(STATE_GETTING_CHUNKED_BYTES);
634         setByteCount(chunkSize);
635     }
636
637     void processTrailer()
638     {
639         if (buffer.empty()) {
640             // end of trailers
641             responseComplete();
642             return;
643         }
644
645     // process as a normal header
646         processHeader();
647     }
648
649     void headersComplete()
650     {
651         activeRequest->responseHeadersComplete();
652         _contentDecoder.initWithRequest(activeRequest);
653
654         if (!activeRequest->serverSupportsPipelining()) {
655             SG_LOG(SG_IO, SG_DEBUG, _connectionId << " disabling pipelining since server does not support it");
656             _maxPipelineLength = 1;
657         }
658
659         if (chunkedTransfer) {
660             setState(STATE_GETTING_CHUNKED);
661         } else if (noMessageBody || (bodyTransferSize == 0)) {
662             // force the state to GETTING_BODY, to simplify logic in
663             // responseComplete and handleClose
664             setState(STATE_GETTING_BODY);
665             responseComplete();
666         } else {
667             setByteCount(bodyTransferSize); // may be -1, that's fine
668             setState(STATE_GETTING_BODY);
669         }
670     }
671
672     void responseComplete()
673     {
674         Request_ptr completedRequest = activeRequest;
675         _contentDecoder.finish();
676
677         assert(sentRequests.front() == activeRequest);
678         sentRequests.pop_front();
679         bool doClose = activeRequest->closeAfterComplete();
680         activeRequest = NULL;
681
682         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
683             if (doClose) {
684                 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
685           // this will bring us into handleClose() above, which updates
686           // state to STATE_CLOSED
687               close();
688
689           // if we have additional requests waiting, try to start them now
690               tryStartNextRequest();
691             }
692         }
693
694         if (state != STATE_CLOSED)  {
695             setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
696         }
697
698     // notify request after we change state, so this connection is idle
699     // if completion triggers other requests (which is likely)
700         completedRequest->responseComplete();
701         client->requestFinished(this);
702
703         setTerminator("\r\n");
704     }
705
706     Client* client;
707     Request_ptr activeRequest;
708     ConnectionState state;
709     std::string host;
710     short port;
711     std::string buffer;
712     int bodyTransferSize;
713     SGTimeStamp idleTime;
714     bool chunkedTransfer;
715     bool noMessageBody;
716
717     RequestList queuedRequests;
718     RequestList sentRequests;
719
720     ContentDecoder _contentDecoder;
721     std::string _connectionId;
722     unsigned int _maxPipelineLength;
723 };
724 #endif // of !ENABLE_CURL
725
726 Client::Client() :
727     d(new ClientPrivate)
728 {
729     d->proxyPort = 0;
730     d->maxConnections = 4;
731     d->maxHostConnections = 4;
732     d->bytesTransferred = 0;
733     d->lastTransferRate = 0;
734     d->timeTransferSample.stamp();
735     d->totalBytesDownloaded = 0;
736     d->maxPipelineDepth = 5;
737     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
738 #if defined(ENABLE_CURL)
739     static bool didInitCurlGlobal = false;
740     if (!didInitCurlGlobal) {
741       curl_global_init(CURL_GLOBAL_ALL);
742       didInitCurlGlobal = true;
743     }
744
745     d->createCurlMulti();
746 #endif
747 }
748
749 Client::~Client()
750 {
751 #if defined(ENABLE_CURL)
752   curl_multi_cleanup(d->curlMulti);
753 #endif
754 }
755
756 void Client::setMaxConnections(unsigned int maxCon)
757 {
758     d->maxConnections = maxCon;
759 #if defined(ENABLE_CURL)
760     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
761 #endif
762 }
763
764 void Client::setMaxHostConnections(unsigned int maxHostCon)
765 {
766     d->maxHostConnections = maxHostCon;
767 #if defined(ENABLE_CURL)
768     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
769 #endif
770 }
771
772 void Client::setMaxPipelineDepth(unsigned int depth)
773 {
774     d->maxPipelineDepth = depth;
775 #if defined(ENABLE_CURL)
776     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
777 #else
778     ConnectionDict::iterator it = d->connections.begin();
779     for (; it != d->connections.end(); ) {
780         it->second->setMaxPipelineLength(depth);
781     }
782 #endif
783 }
784
785 void Client::update(int waitTimeout)
786 {
787 #if defined(ENABLE_CURL)
788     int remainingActive, messagesInQueue;
789     curl_multi_perform(d->curlMulti, &remainingActive);
790     d->haveActiveRequests = (remainingActive > 0);
791
792     CURLMsg* msg;
793     while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
794       if (msg->msg == CURLMSG_DONE) {
795         Request* req;
796         CURL *e = msg->easy_handle;
797         curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
798
799         long responseCode;
800         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
801
802         if (msg->data.result == 0) {
803           req->responseComplete();
804         } else {
805           fprintf(stderr, "Result: %d - %s\n",
806                 msg->data.result, curl_easy_strerror(msg->data.result));
807           req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
808         }
809
810         curl_multi_remove_handle(d->curlMulti, e);
811
812         // balance the reference we take in makeRequest
813         SGReferenced::put(req);
814         curl_easy_cleanup(e);
815       }
816       else {
817         SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
818       }
819     } // of curl message processing loop
820 #else
821     if (!d->poller.hasChannels() && (waitTimeout > 0)) {
822         SGTimeStamp::sleepForMSec(waitTimeout);
823     } else {
824         d->poller.poll(waitTimeout);
825     }
826
827     bool waitingRequests = !d->pendingRequests.empty();
828     ConnectionDict::iterator it = d->connections.begin();
829     for (; it != d->connections.end(); ) {
830         Connection* con = it->second;
831         if (con->hasIdleTimeout() ||
832             con->hasError() ||
833             con->hasErrorTimeout() ||
834             (!con->isActive() && waitingRequests))
835         {
836             if (con->hasErrorTimeout()) {
837                 // tell the connection we're timing it out
838                 con->handleTimeout();
839             }
840
841         // connection has been idle for a while, clean it up
842         // (or if we have requests waiting for a different host,
843         // or an error condition
844             ConnectionDict::iterator del = it++;
845             delete del->second;
846             d->connections.erase(del);
847         } else {
848             if (it->second->shouldStartNext()) {
849                 it->second->tryStartNextRequest();
850             }
851             ++it;
852         }
853     } // of connection iteration
854
855     if (waitingRequests && (d->connections.size() < d->maxConnections)) {
856         RequestList waiting(d->pendingRequests);
857         d->pendingRequests.clear();
858
859         // re-submit all waiting requests in order; this takes care of
860         // finding multiple pending items targetted to the same (new)
861         // connection
862         BOOST_FOREACH(Request_ptr req, waiting) {
863             makeRequest(req);
864         }
865     }
866 #endif
867 }
868
869 void Client::makeRequest(const Request_ptr& r)
870 {
871     if( r->isComplete() )
872       return;
873
874     if( r->url().find("://") == std::string::npos ) {
875         r->setFailure(EINVAL, "malformed URL");
876         return;
877     }
878
879 #if defined(ENABLE_CURL)
880     CURL* curlRequest = curl_easy_init();
881     curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
882
883     // manually increase the ref count of the request
884     SGReferenced::get(r.get());
885     curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
886     // disable built-in libCurl progress feedback
887     curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
888
889     curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
890     curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
891     curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
892     curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
893
894     curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
895     curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
896
897     if (!d->proxy.empty()) {
898       curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
899       curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
900
901       if (!d->proxyAuth.empty()) {
902         curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
903         curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
904       }
905     }
906
907     std::string method = boost::to_lower_copy(r->method());
908     if (method == "get") {
909       curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
910     } else if (method == "put") {
911       curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
912       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
913     } else if (method == "post") {
914       // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
915       curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
916
917       std::string q = r->query().substr(1);
918       curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
919
920       // reset URL to exclude query pieces
921       std::string urlWithoutQuery = r->url();
922       std::string::size_type queryPos = urlWithoutQuery.find('?');
923       urlWithoutQuery.resize(queryPos);
924       curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
925     } else {
926       curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
927     }
928
929     struct curl_slist* headerList = NULL;
930     if (r->hasBodyData() && (method != "post")) {
931       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
932       curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
933       curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
934       curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
935       std::string h = "Content-Type:" + r->bodyType();
936       headerList = curl_slist_append(headerList, h.c_str());
937     }
938
939     StringMap::const_iterator it;
940     for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
941       std::string h = it->first + ": " + it->second;
942       headerList = curl_slist_append(headerList, h.c_str());
943     }
944
945     if (headerList != NULL) {
946       curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
947     }
948
949     curl_multi_add_handle(d->curlMulti, curlRequest);
950     d->haveActiveRequests = true;
951
952 // FIXME - premature?
953     r->requestStart();
954
955 #else
956     if( r->url().find("http://") != 0 ) {
957         r->setFailure(EINVAL, "only HTTP protocol is supported");
958         return;
959     }
960
961     std::string host = r->host();
962     int port = r->port();
963     if (!d->proxy.empty()) {
964         host = d->proxy;
965         port = d->proxyPort;
966     }
967
968     Connection* con = NULL;
969     std::stringstream ss;
970     ss << host << "-" << port;
971     std::string connectionId = ss.str();
972     bool havePending = !d->pendingRequests.empty();
973     bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
974     ConnectionDict::iterator consEnd = d->connections.end();
975
976     // assign request to an existing Connection.
977     // various options exist here, examined in order
978     ConnectionDict::iterator it = d->connections.find(connectionId);
979     if (atConnectionsLimit && (it == consEnd)) {
980         // maximum number of connections active, queue this request
981         // when a connection goes inactive, we'll start this one
982         d->pendingRequests.push_back(r);
983         return;
984     }
985
986     // scan for an idle Connection to the same host (likely if we're
987     // retrieving multiple resources from the same host in quick succession)
988     // if we have pending requests (waiting for a free Connection), then
989     // force new requests on this id to always use the first Connection
990     // (instead of the random selection below). This ensures that when
991     // there's pressure on the number of connections to keep alive, one
992     // host can't DoS every other.
993     int count = 0;
994     for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
995         if (havePending || !it->second->isActive()) {
996             con = it->second;
997             break;
998         }
999     }
1000
1001     bool atHostConnectionsLimit = (count >= d->maxHostConnections);
1002
1003     if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
1004         // all current connections are busy (active), and we don't
1005         // have free connections to allocate, so let's assign to
1006         // an existing one randomly. Ideally we'd used whichever one will
1007         // complete first but we don't have that info.
1008         int index = rand() % count;
1009         for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
1010         con = it->second;
1011     }
1012
1013     // allocate a new connection object
1014     if (!con) {
1015         static int connectionSuffx = 0;
1016
1017         std::stringstream ss;
1018         ss << connectionId << "-" << connectionSuffx++;
1019
1020         SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
1021         con = new Connection(this, ss.str());
1022         con->setServer(host, port);
1023         con->setMaxPipelineLength(d->maxPipelineDepth);
1024         d->poller.addChannel(con);
1025         d->connections.insert(d->connections.end(),
1026             ConnectionDict::value_type(connectionId, con));
1027     }
1028
1029     SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
1030     con->queueRequest(r);
1031 #endif
1032 }
1033
1034 //------------------------------------------------------------------------------
1035 FileRequestRef Client::save( const std::string& url,
1036                              const std::string& filename )
1037 {
1038   FileRequestRef req = new FileRequest(url, filename);
1039   makeRequest(req);
1040   return req;
1041 }
1042
1043 //------------------------------------------------------------------------------
1044 MemoryRequestRef Client::load(const std::string& url)
1045 {
1046   MemoryRequestRef req = new MemoryRequest(url);
1047   makeRequest(req);
1048   return req;
1049 }
1050
1051 void Client::requestFinished(Connection* con)
1052 {
1053
1054 }
1055
1056 void Client::setUserAgent(const std::string& ua)
1057 {
1058     d->userAgent = ua;
1059 }
1060
1061 const std::string& Client::userAgent() const
1062 {
1063     return d->userAgent;
1064 }
1065
1066 const std::string& Client::proxyHost() const
1067 {
1068     return d->proxy;
1069 }
1070
1071 const std::string& Client::proxyAuth() const
1072 {
1073     return d->proxyAuth;
1074 }
1075
1076 void Client::setProxy( const std::string& proxy,
1077                        int port,
1078                        const std::string& auth )
1079 {
1080     d->proxy = proxy;
1081     d->proxyPort = port;
1082     d->proxyAuth = auth;
1083 }
1084
1085 bool Client::hasActiveRequests() const
1086 {
1087   #if defined(ENABLE_CURL)
1088     return d->haveActiveRequests;
1089   #else
1090     ConnectionDict::const_iterator it = d->connections.begin();
1091     for (; it != d->connections.end(); ++it) {
1092         if (it->second->isActive()) return true;
1093     }
1094
1095     return false;
1096 #endif
1097 }
1098
1099 void Client::receivedBytes(unsigned int count)
1100 {
1101     d->bytesTransferred += count;
1102     d->totalBytesDownloaded += count;
1103 }
1104
1105 unsigned int Client::transferRateBytesPerSec() const
1106 {
1107     unsigned int e = d->timeTransferSample.elapsedMSec();
1108     if (e > 400) {
1109         // too long a window, ignore
1110         d->timeTransferSample.stamp();
1111         d->bytesTransferred = 0;
1112         d->lastTransferRate = 0;
1113         return 0;
1114     }
1115
1116     if (e < 100) { // avoid really narrow windows
1117         return d->lastTransferRate;
1118     }
1119
1120     unsigned int ratio = (d->bytesTransferred * 1000) / e;
1121     // run a low-pass filter
1122     unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1123     smoothed /= 400;
1124
1125     d->timeTransferSample.stamp();
1126     d->bytesTransferred = 0;
1127     d->lastTransferRate = smoothed;
1128     return smoothed;
1129 }
1130
1131 uint64_t Client::totalBytesDownloaded() const
1132 {
1133     return d->totalBytesDownloaded;
1134 }
1135
1136 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1137 {
1138   size_t byteSize = size * nmemb;
1139
1140   Request* req = static_cast<Request*>(userdata);
1141   req->processBodyBytes(ptr, byteSize);
1142   return byteSize;
1143 }
1144
1145 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1146 {
1147   size_t maxBytes = size * nmemb;
1148   Request* req = static_cast<Request*>(userdata);
1149   size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1150   return actualBytes;
1151 }
1152
1153 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1154 {
1155   size_t byteSize = size * nitems;
1156   Request* req = static_cast<Request*>(userdata);
1157   std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1158
1159   if (req->readyState() == HTTP::Request::OPENED) {
1160     req->responseStart(h);
1161     return byteSize;
1162   }
1163
1164   if (h.empty()) {
1165       // got a 100-continue reponse; restart
1166       if (req->responseCode() == 100) {
1167           req->setReadyState(HTTP::Request::OPENED);
1168           return byteSize;
1169       }
1170
1171     req->responseHeadersComplete();
1172     return byteSize;
1173   }
1174
1175   if (req->responseCode() == 100) {
1176       return byteSize; // skip headers associated with 100-continue status
1177   }
1178
1179   size_t colonPos = h.find(':');
1180   if (colonPos == std::string::npos) {
1181       SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1182       return byteSize;
1183   }
1184
1185   std::string key = strutils::simplify(h.substr(0, colonPos));
1186   std::string lkey = boost::to_lower_copy(key);
1187   std::string value = strutils::strip(h.substr(colonPos + 1));
1188
1189   req->responseHeader(lkey, value);
1190   return byteSize;
1191 }
1192
1193 void Client::debugDumpRequests()
1194 {
1195 #if defined(ENABLE_CURL)
1196
1197 #else
1198     SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
1199     ConnectionDict::iterator it = d->connections.begin();
1200     for (; it != d->connections.end(); ++it) {
1201         it->second->debugDumpRequests();
1202     }
1203     SG_LOG(SG_IO, SG_INFO, "==");
1204 #endif
1205 }
1206
1207 void Client::clearAllConnections()
1208 {
1209 #if defined(ENABLE_CURL)
1210     curl_multi_cleanup(d->curlMulti);
1211     d->createCurlMulti();
1212 #else
1213     ConnectionDict::iterator it = d->connections.begin();
1214     for (; it != d->connections.end(); ++it) {
1215         delete it->second;
1216     }
1217     d->connections.clear();
1218 #endif
1219 }
1220
1221 } // of namespace HTTP
1222
1223 } // of namespace simgear