]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
39d0367966cf265347ab23cb44550ad37748eada
[simgear.git] / simgear / io / HTTPClient.cxx
1 /**
2  * \file HTTPClient.cxx - simple HTTP client engine for SimHear
3  */
4
5 // Written by James Turner
6 //
7 // Copyright (C) 2013  James Turner  <zakalawe@mac.com>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 // Library General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
22 //
23
24
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
27
28 #include <sstream>
29 #include <cassert>
30 #include <cstdlib> // rand()
31 #include <list>
32 #include <errno.h>
33 #include <map>
34 #include <stdexcept>
35
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
38
39 #include <simgear/simgear_config.h>
40
41 #if defined(ENABLE_CURL)
42   #include <curl/multi.h>
43 #else
44     #include <simgear/io/HTTPContentDecode.hxx>
45 #endif
46
47 #include <simgear/io/sg_netChat.hxx>
48
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
54
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
56 #include "version.h"
57 #else
58 #  if !defined(SIMGEAR_VERSION)
59 #    define SIMGEAR_VERSION "simgear-development"
60 #  endif
61 #endif
62
63 namespace simgear
64 {
65
66 namespace HTTP
67 {
68
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
71
72 class Connection;
73 typedef std::multimap<std::string, Connection*> ConnectionDict;
74 typedef std::list<Request_ptr> RequestList;
75
76 class Client::ClientPrivate
77 {
78 public:
79 #if defined(ENABLE_CURL)
80     CURLM* curlMulti;
81     bool haveActiveRequests;
82
83     void createCurlMulti()
84     {
85         curlMulti = curl_multi_init();
86         // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
87         // we request HTTP 1.1 pipelining
88         curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
89         curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
90         curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
91                           (long) maxPipelineDepth);
92         curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
93                           (long) maxHostConnections);
94
95
96     }
97 #else
98     NetChannelPoller poller;
99 // connections by host (potentially more than one)
100     ConnectionDict connections;
101 #endif
102
103     std::string userAgent;
104     std::string proxy;
105     int proxyPort;
106     std::string proxyAuth;
107     unsigned int maxConnections;
108     unsigned int maxHostConnections;
109     unsigned int maxPipelineDepth;
110
111     RequestList pendingRequests;
112
113     SGTimeStamp timeTransferSample;
114     unsigned int bytesTransferred;
115     unsigned int lastTransferRate;
116     uint64_t totalBytesDownloaded;
117 };
118
119 #if !defined(ENABLE_CURL)
120 class Connection : public NetChat
121 {
122 public:
123     Connection(Client* pr, const std::string& conId) :
124         client(pr),
125         state(STATE_CLOSED),
126         port(DEFAULT_HTTP_PORT),
127         _connectionId(conId),
128         _maxPipelineLength(255)
129     {
130     }
131
132     virtual ~Connection()
133     {
134     }
135
136     virtual void handleBufferRead (NetBuffer& buffer)
137     {
138       if( !activeRequest || !activeRequest->isComplete() )
139         return NetChat::handleBufferRead(buffer);
140
141       // Request should be aborted (signaled by setting its state to complete).
142
143       // force the state to GETTING_BODY, to simplify logic in
144       // responseComplete and handleClose
145         setState(STATE_GETTING_BODY);
146       responseComplete();
147     }
148
149     void setServer(const std::string& h, short p)
150     {
151         host = h;
152         port = p;
153     }
154
155     void setMaxPipelineLength(unsigned int m)
156     {
157         _maxPipelineLength = m;
158     }
159
160     // socket-level errors
161     virtual void handleError(int error)
162     {
163         const char* errStr = strerror(error);
164         SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
165                << errStr << ")");
166
167         debugDumpRequests();
168
169         if (!activeRequest)
170         {
171         // connection level failure, eg name lookup or routing
172         // we won't have an active request yet, so let's fail all of the
173         // requests since we presume it's a systematic failure for
174         // the host in question
175             BOOST_FOREACH(Request_ptr req, sentRequests) {
176                 req->setFailure(error, errStr);
177             }
178
179             BOOST_FOREACH(Request_ptr req, queuedRequests) {
180                 req->setFailure(error, errStr);
181             }
182
183             sentRequests.clear();
184             queuedRequests.clear();
185         }
186
187         NetChat::handleError(error);
188         if (activeRequest) {
189             activeRequest->setFailure(error, errStr);
190             activeRequest = NULL;
191             _contentDecoder.reset();
192         }
193
194         setState(STATE_SOCKET_ERROR);
195     }
196
197     void handleTimeout()
198     {
199         handleError(ETIMEDOUT);
200     }
201
202     virtual void handleClose()
203     {
204         NetChat::handleClose();
205
206     // closing of the connection from the server side when getting the body,
207         bool canCloseState = (state == STATE_GETTING_BODY);
208         if (canCloseState && activeRequest) {
209             // check bodyTransferSize matches how much we actually transferred
210             if (bodyTransferSize > 0) {
211                 if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
212                     SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
213                            << "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
214                 }
215             }
216
217         // force state here, so responseComplete can avoid closing the
218         // socket again
219             SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
220             setState(STATE_CLOSED);
221             responseComplete();
222         } else {
223             if (state == STATE_WAITING_FOR_RESPONSE) {
224                 SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
225                        << sentRequests.front()->url());
226                 assert(!sentRequests.empty());
227                 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
228                 // no active request, but don't restore the front sent one
229                 sentRequests.erase(sentRequests.begin());
230             }
231
232             if (activeRequest) {
233                 activeRequest->setFailure(500, "server closed connection");
234                 // remove the failed request from sentRequests, so it does
235                 // not get restored
236                 RequestList::iterator it = std::find(sentRequests.begin(),
237                     sentRequests.end(), activeRequest);
238                 if (it != sentRequests.end()) {
239                     sentRequests.erase(it);
240                 }
241                 activeRequest = NULL;
242                 _contentDecoder.reset();
243             }
244
245             setState(STATE_CLOSED);
246         }
247
248       if (sentRequests.empty()) {
249         return;
250       }
251
252     // restore sent requests to the queue, so they will be re-sent
253     // when the connection opens again
254       queuedRequests.insert(queuedRequests.begin(),
255                               sentRequests.begin(), sentRequests.end());
256       sentRequests.clear();
257     }
258
259     void queueRequest(const Request_ptr& r)
260     {
261         queuedRequests.push_back(r);
262         tryStartNextRequest();
263     }
264
265     void beginResponse()
266     {
267         assert(!sentRequests.empty());
268         assert(state == STATE_WAITING_FOR_RESPONSE);
269
270         activeRequest = sentRequests.front();
271         try {
272             SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
273             activeRequest->responseStart(buffer);
274         } catch (sg_exception& e) {
275             handleError(EIO);
276             return;
277         }
278
279       setState(STATE_GETTING_HEADERS);
280       buffer.clear();
281       if (activeRequest->responseCode() == 204) {
282         noMessageBody = true;
283       } else if (activeRequest->method() == "HEAD") {
284         noMessageBody = true;
285       } else {
286         noMessageBody = false;
287       }
288
289       bodyTransferSize = -1;
290       chunkedTransfer = false;
291       _contentDecoder.reset();
292     }
293
294     void tryStartNextRequest()
295     {
296       while( !queuedRequests.empty()
297           && queuedRequests.front()->isComplete() )
298         queuedRequests.pop_front();
299
300       if (queuedRequests.empty()) {
301         idleTime.stamp();
302         return;
303       }
304
305       if (sentRequests.size() >= _maxPipelineLength) {
306         return;
307       }
308
309       if (state == STATE_CLOSED) {
310           if (!connectToHost()) {
311               setState(STATE_SOCKET_ERROR);
312               return;
313           }
314
315           SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
316           setTerminator("\r\n");
317           setState(STATE_IDLE);
318       }
319
320       Request_ptr r = queuedRequests.front();
321       r->requestStart();
322
323       std::stringstream headerData;
324       std::string path = r->path();
325       assert(!path.empty());
326       std::string query = r->query();
327       std::string bodyData;
328
329       if (!client->proxyHost().empty()) {
330           path = r->scheme() + "://" + r->host() + r->path();
331       }
332
333       if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
334           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
335           bodyData = query.substr(1); // URL-encode, drop the leading '?'
336           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
337           headerData << "Content-Length:" << bodyData.size() << "\r\n";
338       } else {
339           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
340           if( r->hasBodyData() )
341           {
342             headerData << "Content-Length:" << r->bodyLength() << "\r\n";
343             headerData << "Content-Type:" << r->bodyType() << "\r\n";
344           }
345       }
346
347       headerData << "Host: " << r->hostAndPort() << "\r\n";
348       headerData << "User-Agent:" << client->userAgent() << "\r\n";
349       headerData << "Accept-Encoding: deflate, gzip\r\n";
350       if (!client->proxyAuth().empty()) {
351           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
352       }
353
354       BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
355           headerData << h.first << ": " << h.second << "\r\n";
356       }
357
358       headerData << "\r\n"; // final CRLF to terminate the headers
359       if (!bodyData.empty()) {
360           headerData << bodyData;
361       }
362
363       bool ok = push(headerData.str().c_str());
364       if (!ok) {
365           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
366           // we've over-stuffed the socket, give up for now, let things
367           // drain down before trying to start any more requests.
368           return;
369       }
370
371       if( r->hasBodyData() )
372         for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
373         {
374           char buf[4096];
375           size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
376           if( len )
377           {
378             if( !bufferSend(buf, len) )
379             {
380               SG_LOG(SG_IO,
381                      SG_WARN,
382                      "overflow the HTTP::Connection output buffer");
383               state = STATE_SOCKET_ERROR;
384               return;
385             }
386             body_bytes_sent += len;
387           }
388           else
389           {
390             SG_LOG(SG_IO,
391                    SG_WARN,
392                    "HTTP asynchronous request body generation is unsupported");
393             break;
394           }
395         }
396
397         SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
398       // successfully sent, remove from queue, and maybe send the next
399       queuedRequests.pop_front();
400       sentRequests.push_back(r);
401       if (state == STATE_IDLE) {
402           setState(STATE_WAITING_FOR_RESPONSE);
403       }
404
405       // pipelining, let's maybe send the next request right away
406       tryStartNextRequest();
407     }
408
409     virtual void collectIncomingData(const char* s, int n)
410     {
411         idleTime.stamp();
412         client->receivedBytes(static_cast<unsigned int>(n));
413
414         if(   (state == STATE_GETTING_BODY)
415            || (state == STATE_GETTING_CHUNKED_BYTES) )
416           _contentDecoder.receivedBytes(s, n);
417         else
418           buffer.append(s, n);
419     }
420
421     virtual void foundTerminator(void)
422     {
423         idleTime.stamp();
424         switch (state) {
425         case STATE_WAITING_FOR_RESPONSE:
426             beginResponse();
427             break;
428
429         case STATE_GETTING_HEADERS:
430             processHeader();
431             buffer.clear();
432             break;
433
434         case STATE_GETTING_BODY:
435             responseComplete();
436             break;
437
438         case STATE_GETTING_CHUNKED:
439             processChunkHeader();
440             break;
441
442         case STATE_GETTING_CHUNKED_BYTES:
443             setTerminator("\r\n");
444             setState(STATE_GETTING_CHUNKED);
445             buffer.clear();
446             break;
447
448
449         case STATE_GETTING_TRAILER:
450             processTrailer();
451             buffer.clear();
452             break;
453
454         case STATE_IDLE:
455             SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
456
457         default:
458             break;
459         }
460     }
461
462     bool hasIdleTimeout() const
463     {
464         if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
465             return false;
466         }
467
468         assert(sentRequests.empty());
469         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
470         return isTimedOut;
471     }
472
473     bool hasErrorTimeout() const
474     {
475       if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
476         return false;
477       }
478
479         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
480         return isTimedOut;
481     }
482
483     bool hasError() const
484     {
485         return (state == STATE_SOCKET_ERROR);
486     }
487
488     bool shouldStartNext() const
489     {
490       return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
491     }
492
493     bool isActive() const
494     {
495         return !queuedRequests.empty() || !sentRequests.empty();
496     }
497
498     std::string connectionId() const
499     {
500         return _connectionId;
501     }
502
503     void debugDumpRequests() const
504     {
505         SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
506                << "; state=" << state << ")");
507         if (activeRequest) {
508             SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
509         } else {
510             SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
511         }
512
513         BOOST_FOREACH(Request_ptr req, sentRequests) {
514             SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
515         }
516
517         BOOST_FOREACH(Request_ptr req, queuedRequests) {
518             SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
519         }
520     }
521 private:
522     enum ConnectionState {
523         STATE_IDLE = 0,
524         STATE_WAITING_FOR_RESPONSE,
525         STATE_GETTING_HEADERS,
526         STATE_GETTING_BODY,
527         STATE_GETTING_CHUNKED,
528         STATE_GETTING_CHUNKED_BYTES,
529         STATE_GETTING_TRAILER,
530         STATE_SOCKET_ERROR,
531         STATE_CLOSED             ///< connection should be closed now
532     };
533
534     void setState(ConnectionState newState)
535     {
536         if (state == newState) {
537             return;
538         }
539
540         state = newState;
541     }
542
543     bool connectToHost()
544     {
545         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
546
547         if (!open()) {
548             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
549             return false;
550         }
551
552         if (connect(host.c_str(), port) != 0) {
553             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
554             return false;
555         }
556
557         return true;
558     }
559
560
561     void processHeader()
562     {
563         std::string h = strutils::simplify(buffer);
564         if (h.empty()) { // blank line terminates headers
565             headersComplete();
566             return;
567         }
568
569         int colonPos = buffer.find(':');
570         if (colonPos < 0) {
571             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
572             return;
573         }
574
575         std::string key = strutils::simplify(buffer.substr(0, colonPos));
576         std::string lkey = boost::to_lower_copy(key);
577         std::string value = strutils::strip(buffer.substr(colonPos + 1));
578
579         // only consider these if getting headers (as opposed to trailers
580         // of a chunked transfer)
581         if (state == STATE_GETTING_HEADERS) {
582             if (lkey == "content-length") {
583
584                 int sz = strutils::to_int(value);
585                 if (bodyTransferSize <= 0) {
586                     bodyTransferSize = sz;
587                 }
588                 activeRequest->setResponseLength(sz);
589             } else if (lkey == "transfer-length") {
590                 bodyTransferSize = strutils::to_int(value);
591             } else if (lkey == "transfer-encoding") {
592                 processTransferEncoding(value);
593             } else if (lkey == "content-encoding") {
594                 _contentDecoder.setEncoding(value);
595             }
596         }
597
598         activeRequest->responseHeader(lkey, value);
599     }
600
601     void processTransferEncoding(const std::string& te)
602     {
603         if (te == "chunked") {
604             chunkedTransfer = true;
605         } else {
606             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
607             // failure
608         }
609     }
610
611     void processChunkHeader()
612     {
613         if (buffer.empty()) {
614             // blank line after chunk data
615             return;
616         }
617
618         int chunkSize = 0;
619         int semiPos = buffer.find(';');
620         if (semiPos >= 0) {
621             // extensions ignored for the moment
622             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
623         } else {
624             chunkSize = strutils::to_int(buffer, 16);
625         }
626
627         buffer.clear();
628         if (chunkSize == 0) {  //  trailer start
629             setState(STATE_GETTING_TRAILER);
630             return;
631         }
632
633         setState(STATE_GETTING_CHUNKED_BYTES);
634         setByteCount(chunkSize);
635     }
636
637     void processTrailer()
638     {
639         if (buffer.empty()) {
640             // end of trailers
641             responseComplete();
642             return;
643         }
644
645     // process as a normal header
646         processHeader();
647     }
648
649     void headersComplete()
650     {
651         activeRequest->responseHeadersComplete();
652         _contentDecoder.initWithRequest(activeRequest);
653
654         if (chunkedTransfer) {
655             setState(STATE_GETTING_CHUNKED);
656         } else if (noMessageBody || (bodyTransferSize == 0)) {
657             // force the state to GETTING_BODY, to simplify logic in
658             // responseComplete and handleClose
659             setState(STATE_GETTING_BODY);
660             responseComplete();
661         } else {
662             setByteCount(bodyTransferSize); // may be -1, that's fine
663             setState(STATE_GETTING_BODY);
664         }
665     }
666
667     void responseComplete()
668     {
669         Request_ptr completedRequest = activeRequest;
670         _contentDecoder.finish();
671
672         assert(sentRequests.front() == activeRequest);
673         sentRequests.pop_front();
674         bool doClose = activeRequest->closeAfterComplete();
675         activeRequest = NULL;
676
677         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
678             if (doClose) {
679                 SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
680           // this will bring us into handleClose() above, which updates
681           // state to STATE_CLOSED
682               close();
683
684           // if we have additional requests waiting, try to start them now
685               tryStartNextRequest();
686             }
687         }
688
689         if (state != STATE_CLOSED)  {
690             setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
691         }
692
693     // notify request after we change state, so this connection is idle
694     // if completion triggers other requests (which is likely)
695         completedRequest->responseComplete();
696         client->requestFinished(this);
697
698         setTerminator("\r\n");
699     }
700
701     Client* client;
702     Request_ptr activeRequest;
703     ConnectionState state;
704     std::string host;
705     short port;
706     std::string buffer;
707     int bodyTransferSize;
708     SGTimeStamp idleTime;
709     bool chunkedTransfer;
710     bool noMessageBody;
711
712     RequestList queuedRequests;
713     RequestList sentRequests;
714
715     ContentDecoder _contentDecoder;
716     std::string _connectionId;
717     unsigned int _maxPipelineLength;
718 };
719 #endif // of !ENABLE_CURL
720
721 Client::Client() :
722     d(new ClientPrivate)
723 {
724     d->proxyPort = 0;
725     d->maxConnections = 4;
726     d->maxHostConnections = 4;
727     d->bytesTransferred = 0;
728     d->lastTransferRate = 0;
729     d->timeTransferSample.stamp();
730     d->totalBytesDownloaded = 0;
731     d->maxPipelineDepth = 5;
732     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
733 #if defined(ENABLE_CURL)
734     static bool didInitCurlGlobal = false;
735     if (!didInitCurlGlobal) {
736       curl_global_init(CURL_GLOBAL_ALL);
737       didInitCurlGlobal = true;
738     }
739
740     d->createCurlMulti();
741 #endif
742 }
743
744 Client::~Client()
745 {
746 #if defined(ENABLE_CURL)
747   curl_multi_cleanup(d->curlMulti);
748 #endif
749 }
750
751 void Client::setMaxConnections(unsigned int maxCon)
752 {
753     d->maxConnections = maxCon;
754 #if defined(ENABLE_CURL)
755     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
756 #endif
757 }
758
759 void Client::setMaxHostConnections(unsigned int maxHostCon)
760 {
761     d->maxHostConnections = maxHostCon;
762 #if defined(ENABLE_CURL)
763     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
764 #endif
765 }
766
767 void Client::setMaxPipelineDepth(unsigned int depth)
768 {
769     d->maxPipelineDepth = depth;
770 #if defined(ENABLE_CURL)
771     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
772 #else
773     ConnectionDict::iterator it = d->connections.begin();
774     for (; it != d->connections.end(); ) {
775         it->second->setMaxPipelineLength(depth);
776     }
777 #endif
778 }
779
780 void Client::update(int waitTimeout)
781 {
782 #if defined(ENABLE_CURL)
783     int remainingActive, messagesInQueue;
784     curl_multi_perform(d->curlMulti, &remainingActive);
785     d->haveActiveRequests = (remainingActive > 0);
786
787     CURLMsg* msg;
788     while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
789       if (msg->msg == CURLMSG_DONE) {
790         Request* req;
791         CURL *e = msg->easy_handle;
792         curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
793
794         long responseCode;
795         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
796
797         if (msg->data.result == 0) {
798           req->responseComplete();
799         } else {
800           fprintf(stderr, "Result: %d - %s\n",
801                 msg->data.result, curl_easy_strerror(msg->data.result));
802           req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
803         }
804
805         curl_multi_remove_handle(d->curlMulti, e);
806
807         // balance the reference we take in makeRequest
808         SGReferenced::put(req);
809         curl_easy_cleanup(e);
810       }
811       else {
812         SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
813       }
814     } // of curl message processing loop
815 #else
816     if (!d->poller.hasChannels() && (waitTimeout > 0)) {
817         SGTimeStamp::sleepForMSec(waitTimeout);
818     } else {
819         d->poller.poll(waitTimeout);
820     }
821
822     bool waitingRequests = !d->pendingRequests.empty();
823     ConnectionDict::iterator it = d->connections.begin();
824     for (; it != d->connections.end(); ) {
825         Connection* con = it->second;
826         if (con->hasIdleTimeout() ||
827             con->hasError() ||
828             con->hasErrorTimeout() ||
829             (!con->isActive() && waitingRequests))
830         {
831             if (con->hasErrorTimeout()) {
832                 // tell the connection we're timing it out
833                 con->handleTimeout();
834             }
835
836         // connection has been idle for a while, clean it up
837         // (or if we have requests waiting for a different host,
838         // or an error condition
839             ConnectionDict::iterator del = it++;
840             delete del->second;
841             d->connections.erase(del);
842         } else {
843             if (it->second->shouldStartNext()) {
844                 it->second->tryStartNextRequest();
845             }
846             ++it;
847         }
848     } // of connection iteration
849
850     if (waitingRequests && (d->connections.size() < d->maxConnections)) {
851         RequestList waiting(d->pendingRequests);
852         d->pendingRequests.clear();
853
854         // re-submit all waiting requests in order; this takes care of
855         // finding multiple pending items targetted to the same (new)
856         // connection
857         BOOST_FOREACH(Request_ptr req, waiting) {
858             makeRequest(req);
859         }
860     }
861 #endif
862 }
863
864 void Client::makeRequest(const Request_ptr& r)
865 {
866     if( r->isComplete() )
867       return;
868
869     if( r->url().find("://") == std::string::npos ) {
870         r->setFailure(EINVAL, "malformed URL");
871         return;
872     }
873
874 #if defined(ENABLE_CURL)
875     CURL* curlRequest = curl_easy_init();
876     curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
877
878     // manually increase the ref count of the request
879     SGReferenced::get(r.get());
880     curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
881     // disable built-in libCurl progress feedback
882     curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
883
884     curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
885     curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
886     curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
887     curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
888
889     curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
890     curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
891
892     if (!d->proxy.empty()) {
893       curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
894       curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
895
896       if (!d->proxyAuth.empty()) {
897         curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
898         curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
899       }
900     }
901
902     std::string method = boost::to_lower_copy(r->method());
903     if (method == "get") {
904       curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
905     } else if (method == "put") {
906       curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
907       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
908     } else if (method == "post") {
909       // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
910       curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
911
912       std::string q = r->query().substr(1);
913       curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
914
915       // reset URL to exclude query pieces
916       std::string urlWithoutQuery = r->url();
917       std::string::size_type queryPos = urlWithoutQuery.find('?');
918       urlWithoutQuery.resize(queryPos);
919       curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
920     } else {
921       curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
922     }
923
924     struct curl_slist* headerList = NULL;
925     if (r->hasBodyData() && (method != "post")) {
926       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
927       curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
928       curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
929       curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
930       std::string h = "Content-Type:" + r->bodyType();
931       headerList = curl_slist_append(headerList, h.c_str());
932     }
933
934     StringMap::const_iterator it;
935     for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
936       std::string h = it->first + ": " + it->second;
937       headerList = curl_slist_append(headerList, h.c_str());
938     }
939
940     if (headerList != NULL) {
941       curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
942     }
943
944     curl_multi_add_handle(d->curlMulti, curlRequest);
945     d->haveActiveRequests = true;
946
947 // FIXME - premature?
948     r->requestStart();
949
950 #else
951     if( r->url().find("http://") != 0 ) {
952         r->setFailure(EINVAL, "only HTTP protocol is supported");
953         return;
954     }
955
956     std::string host = r->host();
957     int port = r->port();
958     if (!d->proxy.empty()) {
959         host = d->proxy;
960         port = d->proxyPort;
961     }
962
963     Connection* con = NULL;
964     std::stringstream ss;
965     ss << host << "-" << port;
966     std::string connectionId = ss.str();
967     bool havePending = !d->pendingRequests.empty();
968     bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
969     ConnectionDict::iterator consEnd = d->connections.end();
970
971     // assign request to an existing Connection.
972     // various options exist here, examined in order
973     ConnectionDict::iterator it = d->connections.find(connectionId);
974     if (atConnectionsLimit && (it == consEnd)) {
975         // maximum number of connections active, queue this request
976         // when a connection goes inactive, we'll start this one
977         d->pendingRequests.push_back(r);
978         return;
979     }
980
981     // scan for an idle Connection to the same host (likely if we're
982     // retrieving multiple resources from the same host in quick succession)
983     // if we have pending requests (waiting for a free Connection), then
984     // force new requests on this id to always use the first Connection
985     // (instead of the random selection below). This ensures that when
986     // there's pressure on the number of connections to keep alive, one
987     // host can't DoS every other.
988     int count = 0;
989     for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
990         if (havePending || !it->second->isActive()) {
991             con = it->second;
992             break;
993         }
994     }
995
996     bool atHostConnectionsLimit = (count >= d->maxHostConnections);
997
998     if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
999         // all current connections are busy (active), and we don't
1000         // have free connections to allocate, so let's assign to
1001         // an existing one randomly. Ideally we'd used whichever one will
1002         // complete first but we don't have that info.
1003         int index = rand() % count;
1004         for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
1005         con = it->second;
1006     }
1007
1008     // allocate a new connection object
1009     if (!con) {
1010         static int connectionSuffx = 0;
1011
1012         std::stringstream ss;
1013         ss << connectionId << "-" << connectionSuffx++;
1014
1015         SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
1016         con = new Connection(this, ss.str());
1017         con->setServer(host, port);
1018         con->setMaxPipelineLength(d->maxPipelineDepth);
1019         d->poller.addChannel(con);
1020         d->connections.insert(d->connections.end(),
1021             ConnectionDict::value_type(connectionId, con));
1022     }
1023
1024     SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
1025     con->queueRequest(r);
1026 #endif
1027 }
1028
1029 //------------------------------------------------------------------------------
1030 FileRequestRef Client::save( const std::string& url,
1031                              const std::string& filename )
1032 {
1033   FileRequestRef req = new FileRequest(url, filename);
1034   makeRequest(req);
1035   return req;
1036 }
1037
1038 //------------------------------------------------------------------------------
1039 MemoryRequestRef Client::load(const std::string& url)
1040 {
1041   MemoryRequestRef req = new MemoryRequest(url);
1042   makeRequest(req);
1043   return req;
1044 }
1045
1046 void Client::requestFinished(Connection* con)
1047 {
1048
1049 }
1050
1051 void Client::setUserAgent(const std::string& ua)
1052 {
1053     d->userAgent = ua;
1054 }
1055
1056 const std::string& Client::userAgent() const
1057 {
1058     return d->userAgent;
1059 }
1060
1061 const std::string& Client::proxyHost() const
1062 {
1063     return d->proxy;
1064 }
1065
1066 const std::string& Client::proxyAuth() const
1067 {
1068     return d->proxyAuth;
1069 }
1070
1071 void Client::setProxy( const std::string& proxy,
1072                        int port,
1073                        const std::string& auth )
1074 {
1075     d->proxy = proxy;
1076     d->proxyPort = port;
1077     d->proxyAuth = auth;
1078 }
1079
1080 bool Client::hasActiveRequests() const
1081 {
1082   #if defined(ENABLE_CURL)
1083     return d->haveActiveRequests;
1084   #else
1085     ConnectionDict::const_iterator it = d->connections.begin();
1086     for (; it != d->connections.end(); ++it) {
1087         if (it->second->isActive()) return true;
1088     }
1089
1090     return false;
1091 #endif
1092 }
1093
1094 void Client::receivedBytes(unsigned int count)
1095 {
1096     d->bytesTransferred += count;
1097     d->totalBytesDownloaded += count;
1098 }
1099
1100 unsigned int Client::transferRateBytesPerSec() const
1101 {
1102     unsigned int e = d->timeTransferSample.elapsedMSec();
1103     if (e > 400) {
1104         // too long a window, ignore
1105         d->timeTransferSample.stamp();
1106         d->bytesTransferred = 0;
1107         d->lastTransferRate = 0;
1108         return 0;
1109     }
1110
1111     if (e < 100) { // avoid really narrow windows
1112         return d->lastTransferRate;
1113     }
1114
1115     unsigned int ratio = (d->bytesTransferred * 1000) / e;
1116     // run a low-pass filter
1117     unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1118     smoothed /= 400;
1119
1120     d->timeTransferSample.stamp();
1121     d->bytesTransferred = 0;
1122     d->lastTransferRate = smoothed;
1123     return smoothed;
1124 }
1125
1126 uint64_t Client::totalBytesDownloaded() const
1127 {
1128     return d->totalBytesDownloaded;
1129 }
1130
1131 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1132 {
1133   size_t byteSize = size * nmemb;
1134
1135   Request* req = static_cast<Request*>(userdata);
1136   req->processBodyBytes(ptr, byteSize);
1137   return byteSize;
1138 }
1139
1140 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1141 {
1142   size_t maxBytes = size * nmemb;
1143   Request* req = static_cast<Request*>(userdata);
1144   size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1145   return actualBytes;
1146 }
1147
1148 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1149 {
1150   size_t byteSize = size * nitems;
1151   Request* req = static_cast<Request*>(userdata);
1152   std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1153
1154   if (req->readyState() == HTTP::Request::OPENED) {
1155     req->responseStart(h);
1156     return byteSize;
1157   }
1158
1159   if (h.empty()) {
1160       // got a 100-continue reponse; restart
1161       if (req->responseCode() == 100) {
1162           req->setReadyState(HTTP::Request::OPENED);
1163           return byteSize;
1164       }
1165
1166     req->responseHeadersComplete();
1167     return byteSize;
1168   }
1169
1170   if (req->responseCode() == 100) {
1171       return byteSize; // skip headers associated with 100-continue status
1172   }
1173
1174   size_t colonPos = h.find(':');
1175   if (colonPos == std::string::npos) {
1176       SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1177       return byteSize;
1178   }
1179
1180   std::string key = strutils::simplify(h.substr(0, colonPos));
1181   std::string lkey = boost::to_lower_copy(key);
1182   std::string value = strutils::strip(h.substr(colonPos + 1));
1183
1184   req->responseHeader(lkey, value);
1185   return byteSize;
1186 }
1187
1188 void Client::debugDumpRequests()
1189 {
1190 #if defined(ENABLE_CURL)
1191
1192 #else
1193     SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
1194     ConnectionDict::iterator it = d->connections.begin();
1195     for (; it != d->connections.end(); ++it) {
1196         it->second->debugDumpRequests();
1197     }
1198     SG_LOG(SG_IO, SG_INFO, "==");
1199 #endif
1200 }
1201
1202 void Client::clearAllConnections()
1203 {
1204 #if defined(ENABLE_CURL)
1205     curl_multi_cleanup(d->curlMulti);
1206     d->createCurlMulti();
1207 #else
1208     ConnectionDict::iterator it = d->connections.begin();
1209     for (; it != d->connections.end(); ++it) {
1210         delete it->second;
1211     }
1212     d->connections.clear();
1213 #endif
1214 }
1215
1216 } // of namespace HTTP
1217
1218 } // of namespace simgear