]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
6ec17c49b8338c29c1230d8d3fd578defc3ed77f
[simgear.git] / simgear / io / HTTPClient.cxx
1 /**
2  * \file HTTPClient.cxx - simple HTTP client engine for SimHear
3  */
4
5 // Written by James Turner
6 //
7 // Copyright (C) 2013  James Turner  <zakalawe@mac.com>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 // Library General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
22 //
23
24
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
27
28 #include <sstream>
29 #include <cassert>
30 #include <cstdlib> // rand()
31 #include <list>
32 #include <errno.h>
33 #include <map>
34 #include <stdexcept>
35
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
38
39 #include <simgear/simgear_config.h>
40
41 #if defined(ENABLE_CURL)
42   #include <curl/multi.h>
43 #else
44     #include <simgear/io/HTTPContentDecode.hxx>
45 #endif
46
47 #include <simgear/io/sg_netChat.hxx>
48
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
54
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
56 #include "version.h"
57 #else
58 #  if !defined(SIMGEAR_VERSION)
59 #    define SIMGEAR_VERSION "simgear-development"
60 #  endif
61 #endif
62
63 namespace simgear
64 {
65
66 namespace HTTP
67 {
68
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
71 const unsigned int MAX_INFLIGHT_REQUESTS = 4;
72
73 class Connection;
74 typedef std::multimap<std::string, Connection*> ConnectionDict;
75 typedef std::list<Request_ptr> RequestList;
76
77 class Client::ClientPrivate
78 {
79 public:
80 #if defined(ENABLE_CURL)
81     CURLM* curlMulti;
82     bool haveActiveRequests;
83
84     void createCurlMulti()
85     {
86         curlMulti = curl_multi_init();
87         // see https://curl.haxx.se/libcurl/c/CURLMOPT_PIPELINING.html
88         // we request HTTP 1.1 pipelining
89         curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
90         curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
91         curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
92                           (long) MAX_INFLIGHT_REQUESTS);
93
94     }
95 #else
96     NetChannelPoller poller;
97 // connections by host (potentially more than one)
98     ConnectionDict connections;
99 #endif
100
101     std::string userAgent;
102     std::string proxy;
103     int proxyPort;
104     std::string proxyAuth;
105     unsigned int maxConnections;
106
107     RequestList pendingRequests;
108
109
110
111     SGTimeStamp timeTransferSample;
112     unsigned int bytesTransferred;
113     unsigned int lastTransferRate;
114     uint64_t totalBytesDownloaded;
115 };
116
117 #if !defined(ENABLE_CURL)
118 class Connection : public NetChat
119 {
120 public:
121     Connection(Client* pr, const std::string& conId) :
122         client(pr),
123         state(STATE_CLOSED),
124         port(DEFAULT_HTTP_PORT),
125         connectionId(conId)
126     {
127     }
128
129     virtual ~Connection()
130     {
131     }
132
133     virtual void handleBufferRead (NetBuffer& buffer)
134     {
135       if( !activeRequest || !activeRequest->isComplete() )
136         return NetChat::handleBufferRead(buffer);
137
138       // Request should be aborted (signaled by setting its state to complete).
139
140       // force the state to GETTING_BODY, to simplify logic in
141       // responseComplete and handleClose
142       state = STATE_GETTING_BODY;
143       responseComplete();
144     }
145
146     void setServer(const std::string& h, short p)
147     {
148         host = h;
149         port = p;
150     }
151
152     // socket-level errors
153     virtual void handleError(int error)
154     {
155         const char* errStr = strerror(error);
156         SG_LOG(SG_IO, SG_WARN, "HTTP Connection handleError:" << error << " ("
157                << errStr << ")");
158
159         debugDumpRequests();
160
161         if (!activeRequest)
162         {
163         // connection level failure, eg name lookup or routing
164         // we won't have an active request yet, so let's fail all of the
165         // requests since we presume it's a systematic failure for
166         // the host in question
167             BOOST_FOREACH(Request_ptr req, sentRequests) {
168                 req->setFailure(error, errStr);
169             }
170
171             BOOST_FOREACH(Request_ptr req, queuedRequests) {
172                 req->setFailure(error, errStr);
173             }
174
175             sentRequests.clear();
176             queuedRequests.clear();
177         }
178
179         NetChat::handleError(error);
180         if (activeRequest) {
181             activeRequest->setFailure(error, errStr);
182             activeRequest = NULL;
183             _contentDecoder.reset();
184         }
185
186         state = STATE_SOCKET_ERROR;
187     }
188
189     void handleTimeout()
190     {
191         handleError(ETIMEDOUT);
192     }
193
194     virtual void handleClose()
195     {
196         NetChat::handleClose();
197
198     // closing of the connection from the server side when getting the body,
199         bool canCloseState = (state == STATE_GETTING_BODY);
200         if (canCloseState && activeRequest) {
201         // force state here, so responseComplete can avoid closing the
202         // socket again
203             state =  STATE_CLOSED;
204             responseComplete();
205         } else {
206             if (state == STATE_WAITING_FOR_RESPONSE) {
207                 assert(!sentRequests.empty());
208                 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
209                 // no active request, but don't restore the front sent one
210                 sentRequests.erase(sentRequests.begin());
211             }
212
213             if (activeRequest) {
214                 activeRequest->setFailure(500, "server closed connection");
215                 // remove the failed request from sentRequests, so it does
216                 // not get restored
217                 RequestList::iterator it = std::find(sentRequests.begin(),
218                     sentRequests.end(), activeRequest);
219                 if (it != sentRequests.end()) {
220                     sentRequests.erase(it);
221                 }
222                 activeRequest = NULL;
223                 _contentDecoder.reset();
224             }
225
226             state = STATE_CLOSED;
227         }
228
229       if (sentRequests.empty()) {
230         return;
231       }
232
233     // restore sent requests to the queue, so they will be re-sent
234     // when the connection opens again
235       queuedRequests.insert(queuedRequests.begin(),
236                               sentRequests.begin(), sentRequests.end());
237       sentRequests.clear();
238     }
239
240     void queueRequest(const Request_ptr& r)
241     {
242         queuedRequests.push_back(r);
243         tryStartNextRequest();
244     }
245
246     void beginResponse()
247     {
248         assert(!sentRequests.empty());
249         assert(state == STATE_WAITING_FOR_RESPONSE);
250
251         activeRequest = sentRequests.front();
252         try {
253             activeRequest->responseStart(buffer);
254         } catch (sg_exception& e) {
255             handleError(EIO);
256             return;
257         }
258
259       state = STATE_GETTING_HEADERS;
260       buffer.clear();
261       if (activeRequest->responseCode() == 204) {
262         noMessageBody = true;
263       } else if (activeRequest->method() == "HEAD") {
264         noMessageBody = true;
265       } else {
266         noMessageBody = false;
267       }
268
269       bodyTransferSize = -1;
270       chunkedTransfer = false;
271       _contentDecoder.reset();
272     }
273
274     void tryStartNextRequest()
275     {
276       while( !queuedRequests.empty()
277           && queuedRequests.front()->isComplete() )
278         queuedRequests.pop_front();
279
280       if (queuedRequests.empty()) {
281         idleTime.stamp();
282         return;
283       }
284
285       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
286         return;
287       }
288
289       if (state == STATE_CLOSED) {
290           if (!connectToHost()) {
291
292               return;
293           }
294
295           setTerminator("\r\n");
296           state = STATE_IDLE;
297       }
298
299       Request_ptr r = queuedRequests.front();
300       r->requestStart();
301
302       std::stringstream headerData;
303       std::string path = r->path();
304       assert(!path.empty());
305       std::string query = r->query();
306       std::string bodyData;
307
308       if (!client->proxyHost().empty()) {
309           path = r->scheme() + "://" + r->host() + r->path();
310       }
311
312       if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
313           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
314           bodyData = query.substr(1); // URL-encode, drop the leading '?'
315           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
316           headerData << "Content-Length:" << bodyData.size() << "\r\n";
317       } else {
318           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
319           if( r->hasBodyData() )
320           {
321             headerData << "Content-Length:" << r->bodyLength() << "\r\n";
322             headerData << "Content-Type:" << r->bodyType() << "\r\n";
323           }
324       }
325
326       headerData << "Host: " << r->hostAndPort() << "\r\n";
327       headerData << "User-Agent:" << client->userAgent() << "\r\n";
328       headerData << "Accept-Encoding: deflate, gzip\r\n";
329       if (!client->proxyAuth().empty()) {
330           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
331       }
332
333       BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
334           headerData << h.first << ": " << h.second << "\r\n";
335       }
336
337       headerData << "\r\n"; // final CRLF to terminate the headers
338       if (!bodyData.empty()) {
339           headerData << bodyData;
340       }
341
342       bool ok = push(headerData.str().c_str());
343       if (!ok) {
344           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
345           // we've over-stuffed the socket, give up for now, let things
346           // drain down before trying to start any more requests.
347           return;
348       }
349
350       if( r->hasBodyData() )
351         for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
352         {
353           char buf[4096];
354           size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
355           if( len )
356           {
357             if( !bufferSend(buf, len) )
358             {
359               SG_LOG(SG_IO,
360                      SG_WARN,
361                      "overflow the HTTP::Connection output buffer");
362               state = STATE_SOCKET_ERROR;
363               return;
364             }
365             body_bytes_sent += len;
366           }
367           else
368           {
369             SG_LOG(SG_IO,
370                    SG_WARN,
371                    "HTTP asynchronous request body generation is unsupported");
372             break;
373           }
374         }
375
376         SG_LOG(SG_IO, SG_DEBUG, "con:" << connectionId << " did start request:" << r->url());
377       // successfully sent, remove from queue, and maybe send the next
378       queuedRequests.pop_front();
379       sentRequests.push_back(r);
380       if (state == STATE_IDLE) {
381           state = STATE_WAITING_FOR_RESPONSE;
382       }
383
384       // pipelining, let's maybe send the next request right away
385       tryStartNextRequest();
386     }
387
388     virtual void collectIncomingData(const char* s, int n)
389     {
390         idleTime.stamp();
391         client->receivedBytes(static_cast<unsigned int>(n));
392
393         if(   (state == STATE_GETTING_BODY)
394            || (state == STATE_GETTING_CHUNKED_BYTES) )
395           _contentDecoder.receivedBytes(s, n);
396         else
397           buffer.append(s, n);
398     }
399
400     virtual void foundTerminator(void)
401     {
402         idleTime.stamp();
403         switch (state) {
404         case STATE_WAITING_FOR_RESPONSE:
405             beginResponse();
406             break;
407
408         case STATE_GETTING_HEADERS:
409             processHeader();
410             buffer.clear();
411             break;
412
413         case STATE_GETTING_BODY:
414             responseComplete();
415             break;
416
417         case STATE_GETTING_CHUNKED:
418             processChunkHeader();
419             break;
420
421         case STATE_GETTING_CHUNKED_BYTES:
422             setTerminator("\r\n");
423             state = STATE_GETTING_CHUNKED;
424             buffer.clear();
425             break;
426
427
428         case STATE_GETTING_TRAILER:
429             processTrailer();
430             buffer.clear();
431             break;
432
433         case STATE_IDLE:
434             SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
435
436         default:
437             break;
438         }
439     }
440
441     bool hasIdleTimeout() const
442     {
443         if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
444             return false;
445         }
446
447         assert(sentRequests.empty());
448         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
449         return isTimedOut;
450     }
451
452     bool hasErrorTimeout() const
453     {
454       if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
455         return false;
456       }
457
458         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
459         return isTimedOut;
460     }
461
462     bool hasError() const
463     {
464         return (state == STATE_SOCKET_ERROR);
465     }
466
467     bool shouldStartNext() const
468     {
469       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
470     }
471
472     bool isActive() const
473     {
474         return !queuedRequests.empty() || !sentRequests.empty();
475     }
476
477     void debugDumpRequests() const
478     {
479         SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << connectionId
480                << "; state=" << state << ")");
481         if (activeRequest) {
482             SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
483         } else {
484             SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
485         }
486
487         BOOST_FOREACH(Request_ptr req, sentRequests) {
488             SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
489         }
490
491         BOOST_FOREACH(Request_ptr req, queuedRequests) {
492             SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
493         }
494     }
495 private:
496     bool connectToHost()
497     {
498         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
499
500         if (!open()) {
501             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
502             return false;
503         }
504
505         if (connect(host.c_str(), port) != 0) {
506             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
507             return false;
508         }
509
510         return true;
511     }
512
513
514     void processHeader()
515     {
516         std::string h = strutils::simplify(buffer);
517         if (h.empty()) { // blank line terminates headers
518             headersComplete();
519             return;
520         }
521
522         int colonPos = buffer.find(':');
523         if (colonPos < 0) {
524             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
525             return;
526         }
527
528         std::string key = strutils::simplify(buffer.substr(0, colonPos));
529         std::string lkey = boost::to_lower_copy(key);
530         std::string value = strutils::strip(buffer.substr(colonPos + 1));
531
532         // only consider these if getting headers (as opposed to trailers
533         // of a chunked transfer)
534         if (state == STATE_GETTING_HEADERS) {
535             if (lkey == "content-length") {
536
537                 int sz = strutils::to_int(value);
538                 if (bodyTransferSize <= 0) {
539                     bodyTransferSize = sz;
540                 }
541                 activeRequest->setResponseLength(sz);
542             } else if (lkey == "transfer-length") {
543                 bodyTransferSize = strutils::to_int(value);
544             } else if (lkey == "transfer-encoding") {
545                 processTransferEncoding(value);
546             } else if (lkey == "content-encoding") {
547                 _contentDecoder.setEncoding(value);
548             }
549         }
550
551         activeRequest->responseHeader(lkey, value);
552     }
553
554     void processTransferEncoding(const std::string& te)
555     {
556         if (te == "chunked") {
557             chunkedTransfer = true;
558         } else {
559             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
560             // failure
561         }
562     }
563
564     void processChunkHeader()
565     {
566         if (buffer.empty()) {
567             // blank line after chunk data
568             return;
569         }
570
571         int chunkSize = 0;
572         int semiPos = buffer.find(';');
573         if (semiPos >= 0) {
574             // extensions ignored for the moment
575             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
576         } else {
577             chunkSize = strutils::to_int(buffer, 16);
578         }
579
580         buffer.clear();
581         if (chunkSize == 0) {  //  trailer start
582             state = STATE_GETTING_TRAILER;
583             return;
584         }
585
586         state = STATE_GETTING_CHUNKED_BYTES;
587         setByteCount(chunkSize);
588     }
589
590     void processTrailer()
591     {
592         if (buffer.empty()) {
593             // end of trailers
594             responseComplete();
595             return;
596         }
597
598     // process as a normal header
599         processHeader();
600     }
601
602     void headersComplete()
603     {
604         activeRequest->responseHeadersComplete();
605         _contentDecoder.initWithRequest(activeRequest);
606
607         if (chunkedTransfer) {
608             state = STATE_GETTING_CHUNKED;
609         } else if (noMessageBody || (bodyTransferSize == 0)) {
610             // force the state to GETTING_BODY, to simplify logic in
611             // responseComplete and handleClose
612             state = STATE_GETTING_BODY;
613             responseComplete();
614         } else {
615             setByteCount(bodyTransferSize); // may be -1, that's fine
616             state = STATE_GETTING_BODY;
617         }
618     }
619
620     void responseComplete()
621     {
622         Request_ptr completedRequest = activeRequest;
623         _contentDecoder.finish();
624
625         assert(sentRequests.front() == activeRequest);
626         sentRequests.pop_front();
627         bool doClose = activeRequest->closeAfterComplete();
628         activeRequest = NULL;
629
630         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
631             if (doClose) {
632           // this will bring us into handleClose() above, which updates
633           // state to STATE_CLOSED
634               close();
635
636           // if we have additional requests waiting, try to start them now
637               tryStartNextRequest();
638             }
639         }
640
641         if (state != STATE_CLOSED)  {
642             state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
643         }
644
645     // notify request after we change state, so this connection is idle
646     // if completion triggers other requests (which is likely)
647         completedRequest->responseComplete();
648         client->requestFinished(this);
649
650         setTerminator("\r\n");
651     }
652
653     enum ConnectionState {
654         STATE_IDLE = 0,
655         STATE_WAITING_FOR_RESPONSE,
656         STATE_GETTING_HEADERS,
657         STATE_GETTING_BODY,
658         STATE_GETTING_CHUNKED,
659         STATE_GETTING_CHUNKED_BYTES,
660         STATE_GETTING_TRAILER,
661         STATE_SOCKET_ERROR,
662         STATE_CLOSED             ///< connection should be closed now
663     };
664
665     Client* client;
666     Request_ptr activeRequest;
667     ConnectionState state;
668     std::string host;
669     short port;
670     std::string buffer;
671     int bodyTransferSize;
672     SGTimeStamp idleTime;
673     bool chunkedTransfer;
674     bool noMessageBody;
675
676     RequestList queuedRequests;
677     RequestList sentRequests;
678
679     ContentDecoder _contentDecoder;
680     std::string connectionId;
681 };
682 #endif // of !ENABLE_CURL
683
684 Client::Client() :
685     d(new ClientPrivate)
686 {
687     d->proxyPort = 0;
688     d->maxConnections = 4;
689     d->bytesTransferred = 0;
690     d->lastTransferRate = 0;
691     d->timeTransferSample.stamp();
692     d->totalBytesDownloaded = 0;
693
694     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
695 #if defined(ENABLE_CURL)
696     static bool didInitCurlGlobal = false;
697     if (!didInitCurlGlobal) {
698       curl_global_init(CURL_GLOBAL_ALL);
699       didInitCurlGlobal = true;
700     }
701
702     d->createCurlMulti();
703 #endif
704 }
705
706 Client::~Client()
707 {
708 #if defined(ENABLE_CURL)
709   curl_multi_cleanup(d->curlMulti);
710 #endif
711 }
712
713 void Client::setMaxConnections(unsigned int maxCon)
714 {
715     if (maxCon < 1) {
716         throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
717     }
718
719     d->maxConnections = maxCon;
720 #if defined(ENABLE_CURL)
721     curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
722 #endif
723 }
724
725 void Client::update(int waitTimeout)
726 {
727 #if defined(ENABLE_CURL)
728     int remainingActive, messagesInQueue;
729     curl_multi_perform(d->curlMulti, &remainingActive);
730     d->haveActiveRequests = (remainingActive > 0);
731
732     CURLMsg* msg;
733     while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
734       if (msg->msg == CURLMSG_DONE) {
735         Request* req;
736         CURL *e = msg->easy_handle;
737         curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
738
739         long responseCode;
740         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
741
742         if (msg->data.result == 0) {
743           req->responseComplete();
744         } else {
745           fprintf(stderr, "Result: %d - %s\n",
746                 msg->data.result, curl_easy_strerror(msg->data.result));
747           req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
748         }
749
750         curl_multi_remove_handle(d->curlMulti, e);
751
752         // balance the reference we take in makeRequest
753         SGReferenced::put(req);
754         curl_easy_cleanup(e);
755       }
756       else {
757         SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
758       }
759     } // of curl message processing loop
760 #else
761     if (!d->poller.hasChannels() && (waitTimeout > 0)) {
762         SGTimeStamp::sleepForMSec(waitTimeout);
763     } else {
764         d->poller.poll(waitTimeout);
765     }
766
767     bool waitingRequests = !d->pendingRequests.empty();
768     ConnectionDict::iterator it = d->connections.begin();
769     for (; it != d->connections.end(); ) {
770         Connection* con = it->second;
771         if (con->hasIdleTimeout() ||
772             con->hasError() ||
773             con->hasErrorTimeout() ||
774             (!con->isActive() && waitingRequests))
775         {
776             if (con->hasErrorTimeout()) {
777                 // tell the connection we're timing it out
778                 con->handleTimeout();
779             }
780
781         // connection has been idle for a while, clean it up
782         // (or if we have requests waiting for a different host,
783         // or an error condition
784             ConnectionDict::iterator del = it++;
785             delete del->second;
786             d->connections.erase(del);
787         } else {
788             if (it->second->shouldStartNext()) {
789                 it->second->tryStartNextRequest();
790             }
791             ++it;
792         }
793     } // of connection iteration
794
795     if (waitingRequests && (d->connections.size() < d->maxConnections)) {
796         RequestList waiting(d->pendingRequests);
797         d->pendingRequests.clear();
798
799         // re-submit all waiting requests in order; this takes care of
800         // finding multiple pending items targetted to the same (new)
801         // connection
802         BOOST_FOREACH(Request_ptr req, waiting) {
803             makeRequest(req);
804         }
805     }
806 #endif
807 }
808
809 void Client::makeRequest(const Request_ptr& r)
810 {
811     if( r->isComplete() )
812       return;
813
814     if( r->url().find("://") == std::string::npos ) {
815         r->setFailure(EINVAL, "malformed URL");
816         return;
817     }
818
819 #if defined(ENABLE_CURL)
820     CURL* curlRequest = curl_easy_init();
821     curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
822
823     // manually increase the ref count of the request
824     SGReferenced::get(r.get());
825     curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
826     // disable built-in libCurl progress feedback
827     curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
828
829     curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
830     curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
831     curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
832     curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
833
834     curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
835     curl_easy_setopt(curlRequest, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
836
837     if (!d->proxy.empty()) {
838       curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
839       curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
840
841       if (!d->proxyAuth.empty()) {
842         curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
843         curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
844       }
845     }
846
847     std::string method = boost::to_lower_copy(r->method());
848     if (method == "get") {
849       curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
850     } else if (method == "put") {
851       curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
852       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
853     } else if (method == "post") {
854       // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
855       curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
856
857       std::string q = r->query().substr(1);
858       curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
859
860       // reset URL to exclude query pieces
861       std::string urlWithoutQuery = r->url();
862       std::string::size_type queryPos = urlWithoutQuery.find('?');
863       urlWithoutQuery.resize(queryPos);
864       curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
865     } else {
866       curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
867     }
868
869     struct curl_slist* headerList = NULL;
870     if (r->hasBodyData() && (method != "post")) {
871       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
872       curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
873       curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
874       curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
875       std::string h = "Content-Type:" + r->bodyType();
876       headerList = curl_slist_append(headerList, h.c_str());
877     }
878
879     StringMap::const_iterator it;
880     for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
881       std::string h = it->first + ": " + it->second;
882       headerList = curl_slist_append(headerList, h.c_str());
883     }
884
885     if (headerList != NULL) {
886       curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
887     }
888
889     curl_multi_add_handle(d->curlMulti, curlRequest);
890     d->haveActiveRequests = true;
891
892 // FIXME - premature?
893     r->requestStart();
894
895 #else
896     if( r->url().find("http://") != 0 ) {
897         r->setFailure(EINVAL, "only HTTP protocol is supported");
898         return;
899     }
900
901     std::string host = r->host();
902     int port = r->port();
903     if (!d->proxy.empty()) {
904         host = d->proxy;
905         port = d->proxyPort;
906     }
907
908     Connection* con = NULL;
909     std::stringstream ss;
910     ss << host << "-" << port;
911     std::string connectionId = ss.str();
912     bool havePending = !d->pendingRequests.empty();
913     bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
914     ConnectionDict::iterator consEnd = d->connections.end();
915
916     // assign request to an existing Connection.
917     // various options exist here, examined in order
918     ConnectionDict::iterator it = d->connections.find(connectionId);
919     if (atConnectionsLimit && (it == consEnd)) {
920         // maximum number of connections active, queue this request
921         // when a connection goes inactive, we'll start this one
922         d->pendingRequests.push_back(r);
923         return;
924     }
925
926     // scan for an idle Connection to the same host (likely if we're
927     // retrieving multiple resources from the same host in quick succession)
928     // if we have pending requests (waiting for a free Connection), then
929     // force new requests on this id to always use the first Connection
930     // (instead of the random selection below). This ensures that when
931     // there's pressure on the number of connections to keep alive, one
932     // host can't DoS every other.
933     int count = 0;
934     for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
935         if (havePending || !it->second->isActive()) {
936             con = it->second;
937             break;
938         }
939     }
940
941     if (!con && atConnectionsLimit) {
942         // all current connections are busy (active), and we don't
943         // have free connections to allocate, so let's assign to
944         // an existing one randomly. Ideally we'd used whichever one will
945         // complete first but we don't have that info.
946         int index = rand() % count;
947         for (it = d->connections.find(connectionId); index > 0; --index) { ; }
948         con = it->second;
949     }
950
951     // allocate a new connection object
952     if (!con) {
953         con = new Connection(this, connectionId);
954         con->setServer(host, port);
955         d->poller.addChannel(con);
956         d->connections.insert(d->connections.end(),
957             ConnectionDict::value_type(connectionId, con));
958     }
959
960     con->queueRequest(r);
961 #endif
962 }
963
964 //------------------------------------------------------------------------------
965 FileRequestRef Client::save( const std::string& url,
966                              const std::string& filename )
967 {
968   FileRequestRef req = new FileRequest(url, filename);
969   makeRequest(req);
970   return req;
971 }
972
973 //------------------------------------------------------------------------------
974 MemoryRequestRef Client::load(const std::string& url)
975 {
976   MemoryRequestRef req = new MemoryRequest(url);
977   makeRequest(req);
978   return req;
979 }
980
981 void Client::requestFinished(Connection* con)
982 {
983
984 }
985
986 void Client::setUserAgent(const std::string& ua)
987 {
988     d->userAgent = ua;
989 }
990
991 const std::string& Client::userAgent() const
992 {
993     return d->userAgent;
994 }
995
996 const std::string& Client::proxyHost() const
997 {
998     return d->proxy;
999 }
1000
1001 const std::string& Client::proxyAuth() const
1002 {
1003     return d->proxyAuth;
1004 }
1005
1006 void Client::setProxy( const std::string& proxy,
1007                        int port,
1008                        const std::string& auth )
1009 {
1010     d->proxy = proxy;
1011     d->proxyPort = port;
1012     d->proxyAuth = auth;
1013 }
1014
1015 bool Client::hasActiveRequests() const
1016 {
1017   #if defined(ENABLE_CURL)
1018     return d->haveActiveRequests;
1019   #else
1020     ConnectionDict::const_iterator it = d->connections.begin();
1021     for (; it != d->connections.end(); ++it) {
1022         if (it->second->isActive()) return true;
1023     }
1024
1025     return false;
1026 #endif
1027 }
1028
1029 void Client::receivedBytes(unsigned int count)
1030 {
1031     d->bytesTransferred += count;
1032     d->totalBytesDownloaded += count;
1033 }
1034
1035 unsigned int Client::transferRateBytesPerSec() const
1036 {
1037     unsigned int e = d->timeTransferSample.elapsedMSec();
1038     if (e > 400) {
1039         // too long a window, ignore
1040         d->timeTransferSample.stamp();
1041         d->bytesTransferred = 0;
1042         d->lastTransferRate = 0;
1043         return 0;
1044     }
1045
1046     if (e < 100) { // avoid really narrow windows
1047         return d->lastTransferRate;
1048     }
1049
1050     unsigned int ratio = (d->bytesTransferred * 1000) / e;
1051     // run a low-pass filter
1052     unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1053     smoothed /= 400;
1054
1055     d->timeTransferSample.stamp();
1056     d->bytesTransferred = 0;
1057     d->lastTransferRate = smoothed;
1058     return smoothed;
1059 }
1060
1061 uint64_t Client::totalBytesDownloaded() const
1062 {
1063     return d->totalBytesDownloaded;
1064 }
1065
1066 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1067 {
1068   size_t byteSize = size * nmemb;
1069
1070   Request* req = static_cast<Request*>(userdata);
1071   req->processBodyBytes(ptr, byteSize);
1072   return byteSize;
1073 }
1074
1075 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1076 {
1077   size_t maxBytes = size * nmemb;
1078   Request* req = static_cast<Request*>(userdata);
1079   size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1080   return actualBytes;
1081 }
1082
1083 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1084 {
1085   size_t byteSize = size * nitems;
1086   Request* req = static_cast<Request*>(userdata);
1087   std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1088
1089   if (req->readyState() == HTTP::Request::OPENED) {
1090     req->responseStart(h);
1091     return byteSize;
1092   }
1093
1094   if (h.empty()) {
1095       // got a 100-continue reponse; restart
1096       if (req->responseCode() == 100) {
1097           req->setReadyState(HTTP::Request::OPENED);
1098           return byteSize;
1099       }
1100
1101     req->responseHeadersComplete();
1102     return byteSize;
1103   }
1104
1105   if (req->responseCode() == 100) {
1106       return byteSize; // skip headers associated with 100-continue status
1107   }
1108
1109   size_t colonPos = h.find(':');
1110   if (colonPos == std::string::npos) {
1111       SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1112       return byteSize;
1113   }
1114
1115   std::string key = strutils::simplify(h.substr(0, colonPos));
1116   std::string lkey = boost::to_lower_copy(key);
1117   std::string value = strutils::strip(h.substr(colonPos + 1));
1118
1119   req->responseHeader(lkey, value);
1120   return byteSize;
1121 }
1122
1123 void Client::debugDumpRequests()
1124 {
1125 #if defined(ENABLE_CURL)
1126
1127 #else
1128     SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
1129     ConnectionDict::iterator it = d->connections.begin();
1130     for (; it != d->connections.end(); ++it) {
1131         it->second->debugDumpRequests();
1132     }
1133     SG_LOG(SG_IO, SG_INFO, "==");
1134 #endif
1135 }
1136
1137 void Client::clearAllConnections()
1138 {
1139 #if defined(ENABLE_CURL)
1140     curl_multi_cleanup(d->curlMulti);
1141     d->createCurlMulti();
1142 #else
1143     ConnectionDict::iterator it = d->connections.begin();
1144     for (; it != d->connections.end(); ++it) {
1145         delete it->second;
1146     }
1147     d->connections.clear();
1148 #endif
1149 }
1150
1151 } // of namespace HTTP
1152
1153 } // of namespace simgear