]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
2a6d41e2feb92d511553107ce44bbdbe7306fc0c
[simgear.git] / simgear / io / HTTPClient.cxx
1 /**
2  * \file HTTPClient.cxx - simple HTTP client engine for SimHear
3  */
4
5 // Written by James Turner
6 //
7 // Copyright (C) 2013  James Turner  <zakalawe@mac.com>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 // Library General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
22 //
23
24
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
27
28 #include <sstream>
29 #include <cassert>
30 #include <cstdlib> // rand()
31 #include <list>
32 #include <errno.h>
33 #include <map>
34 #include <stdexcept>
35
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
38
39 #include <simgear/simgear_config.h>
40
41 #if defined(ENABLE_CURL)
42   #include <curl/multi.h>
43 #else
44     #include <simgear/io/HTTPContentDecode.hxx>
45 #endif
46
47 #include <simgear/io/sg_netChat.hxx>
48
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
54
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
56 #include "version.h"
57 #else
58 #  if !defined(SIMGEAR_VERSION)
59 #    define SIMGEAR_VERSION "simgear-development"
60 #  endif
61 #endif
62
63 namespace simgear
64 {
65
66 namespace HTTP
67 {
68
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
71 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
72
73 class Connection;
74 typedef std::multimap<std::string, Connection*> ConnectionDict;
75 typedef std::list<Request_ptr> RequestList;
76
77 class Client::ClientPrivate
78 {
79 public:
80 #if defined(ENABLE_CURL)
81     CURLM* curlMulti;
82     bool haveActiveRequests;
83 #else
84     NetChannelPoller poller;
85 // connections by host (potentially more than one)
86     ConnectionDict connections;
87 #endif
88
89     std::string userAgent;
90     std::string proxy;
91     int proxyPort;
92     std::string proxyAuth;
93     unsigned int maxConnections;
94
95     RequestList pendingRequests;
96
97
98
99     SGTimeStamp timeTransferSample;
100     unsigned int bytesTransferred;
101     unsigned int lastTransferRate;
102     uint64_t totalBytesDownloaded;
103 };
104
105 #if !defined(ENABLE_CURL)
106 class Connection : public NetChat
107 {
108 public:
109     Connection(Client* pr) :
110         client(pr),
111         state(STATE_CLOSED),
112         port(DEFAULT_HTTP_PORT)
113     {
114     }
115
116     virtual ~Connection()
117     {
118     }
119
120     virtual void handleBufferRead (NetBuffer& buffer)
121     {
122       if( !activeRequest || !activeRequest->isComplete() )
123         return NetChat::handleBufferRead(buffer);
124
125       // Request should be aborted (signaled by setting its state to complete).
126
127       // force the state to GETTING_BODY, to simplify logic in
128       // responseComplete and handleClose
129       state = STATE_GETTING_BODY;
130       responseComplete();
131     }
132
133     void setServer(const std::string& h, short p)
134     {
135         host = h;
136         port = p;
137     }
138
139     // socket-level errors
140     virtual void handleError(int error)
141     {
142         const char* errStr = strerror(error);
143         if (!activeRequest)
144         {
145         // connection level failure, eg name lookup or routing
146         // we won't have an active request yet, so let's fail all of the
147         // requests since we presume it's a systematic failure for
148         // the host in question
149             BOOST_FOREACH(Request_ptr req, sentRequests) {
150                 req->setFailure(error, errStr);
151             }
152
153             BOOST_FOREACH(Request_ptr req, queuedRequests) {
154                 req->setFailure(error, errStr);
155             }
156
157             sentRequests.clear();
158             queuedRequests.clear();
159         }
160
161         NetChat::handleError(error);
162         if (activeRequest) {
163             activeRequest->setFailure(error, errStr);
164             activeRequest = NULL;
165             _contentDecoder.reset();
166         }
167
168         state = STATE_SOCKET_ERROR;
169     }
170
171     virtual void handleClose()
172     {
173         NetChat::handleClose();
174
175     // closing of the connection from the server side when getting the body,
176         bool canCloseState = (state == STATE_GETTING_BODY);
177         if (canCloseState && activeRequest) {
178         // force state here, so responseComplete can avoid closing the
179         // socket again
180             state =  STATE_CLOSED;
181             responseComplete();
182         } else {
183             if (activeRequest) {
184                 activeRequest->setFailure(500, "server closed connection");
185                 // remove the failed request from sentRequests, so it does
186                 // not get restored
187                 RequestList::iterator it = std::find(sentRequests.begin(),
188                     sentRequests.end(), activeRequest);
189                 if (it != sentRequests.end()) {
190                     sentRequests.erase(it);
191                 }
192                 activeRequest = NULL;
193                 _contentDecoder.reset();
194             }
195
196             state = STATE_CLOSED;
197         }
198
199       if (sentRequests.empty()) {
200         return;
201       }
202
203     // restore sent requests to the queue, so they will be re-sent
204     // when the connection opens again
205       queuedRequests.insert(queuedRequests.begin(),
206                               sentRequests.begin(), sentRequests.end());
207       sentRequests.clear();
208     }
209
210     void handleTimeout()
211     {
212         NetChat::handleError(ETIMEDOUT);
213         if (activeRequest) {
214             SG_LOG(SG_IO, SG_DEBUG, "HTTP socket timeout");
215             activeRequest->setFailure(ETIMEDOUT, "socket timeout");
216             activeRequest = NULL;
217             _contentDecoder.reset();
218         }
219
220         state = STATE_SOCKET_ERROR;
221     }
222
223     void queueRequest(const Request_ptr& r)
224     {
225         queuedRequests.push_back(r);
226         tryStartNextRequest();
227     }
228
229     void beginResponse()
230     {
231         assert(!sentRequests.empty());
232         assert(state == STATE_WAITING_FOR_RESPONSE);
233
234         activeRequest = sentRequests.front();
235         try {
236             activeRequest->responseStart(buffer);
237         } catch (sg_exception& e) {
238             handleError(EIO);
239             return;
240         }
241
242       state = STATE_GETTING_HEADERS;
243       buffer.clear();
244       if (activeRequest->responseCode() == 204) {
245         noMessageBody = true;
246       } else if (activeRequest->method() == "HEAD") {
247         noMessageBody = true;
248       } else {
249         noMessageBody = false;
250       }
251
252       bodyTransferSize = -1;
253       chunkedTransfer = false;
254       _contentDecoder.reset();
255     }
256
257     void tryStartNextRequest()
258     {
259       while( !queuedRequests.empty()
260           && queuedRequests.front()->isComplete() )
261         queuedRequests.pop_front();
262
263       if (queuedRequests.empty()) {
264         idleTime.stamp();
265         return;
266       }
267
268       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
269         return;
270       }
271
272       if (state == STATE_CLOSED) {
273           if (!connectToHost()) {
274
275               return;
276           }
277
278           setTerminator("\r\n");
279           state = STATE_IDLE;
280       }
281
282       Request_ptr r = queuedRequests.front();
283       r->requestStart();
284
285       std::stringstream headerData;
286       std::string path = r->path();
287       assert(!path.empty());
288       std::string query = r->query();
289       std::string bodyData;
290
291       if (!client->proxyHost().empty()) {
292           path = r->scheme() + "://" + r->host() + r->path();
293       }
294
295       if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
296           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
297           bodyData = query.substr(1); // URL-encode, drop the leading '?'
298           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
299           headerData << "Content-Length:" << bodyData.size() << "\r\n";
300       } else {
301           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
302           if( r->hasBodyData() )
303           {
304             headerData << "Content-Length:" << r->bodyLength() << "\r\n";
305             headerData << "Content-Type:" << r->bodyType() << "\r\n";
306           }
307       }
308
309       headerData << "Host: " << r->hostAndPort() << "\r\n";
310       headerData << "User-Agent:" << client->userAgent() << "\r\n";
311       headerData << "Accept-Encoding: deflate, gzip\r\n";
312       if (!client->proxyAuth().empty()) {
313           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
314       }
315
316       BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
317           headerData << h.first << ": " << h.second << "\r\n";
318       }
319
320       headerData << "\r\n"; // final CRLF to terminate the headers
321       if (!bodyData.empty()) {
322           headerData << bodyData;
323       }
324
325       bool ok = push(headerData.str().c_str());
326       if (!ok) {
327           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
328           // we've over-stuffed the socket, give up for now, let things
329           // drain down before trying to start any more requests.
330           return;
331       }
332
333       if( r->hasBodyData() )
334         for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
335         {
336           char buf[4096];
337           size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
338           if( len )
339           {
340             if( !bufferSend(buf, len) )
341             {
342               SG_LOG(SG_IO,
343                      SG_WARN,
344                      "overflow the HTTP::Connection output buffer");
345               state = STATE_SOCKET_ERROR;
346               return;
347             }
348             body_bytes_sent += len;
349           }
350           else
351           {
352             SG_LOG(SG_IO,
353                    SG_WARN,
354                    "HTTP asynchronous request body generation is unsupported");
355             break;
356           }
357         }
358
359       //   SG_LOG(SG_IO, SG_INFO, "did start request:" << r->url() <<
360       //       "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
361       //      "\n\t on connection " << this);
362       // successfully sent, remove from queue, and maybe send the next
363       queuedRequests.pop_front();
364       sentRequests.push_back(r);
365       state = STATE_WAITING_FOR_RESPONSE;
366
367       // pipelining, let's maybe send the next request right away
368       tryStartNextRequest();
369     }
370
371     virtual void collectIncomingData(const char* s, int n)
372     {
373         idleTime.stamp();
374         client->receivedBytes(static_cast<unsigned int>(n));
375
376         if(   (state == STATE_GETTING_BODY)
377            || (state == STATE_GETTING_CHUNKED_BYTES) )
378           _contentDecoder.receivedBytes(s, n);
379         else
380           buffer.append(s, n);
381     }
382
383     virtual void foundTerminator(void)
384     {
385         idleTime.stamp();
386         switch (state) {
387         case STATE_WAITING_FOR_RESPONSE:
388             beginResponse();
389             break;
390
391         case STATE_GETTING_HEADERS:
392             processHeader();
393             buffer.clear();
394             break;
395
396         case STATE_GETTING_BODY:
397             responseComplete();
398             break;
399
400         case STATE_GETTING_CHUNKED:
401             processChunkHeader();
402             break;
403
404         case STATE_GETTING_CHUNKED_BYTES:
405             setTerminator("\r\n");
406             state = STATE_GETTING_CHUNKED;
407             buffer.clear();
408             break;
409
410
411         case STATE_GETTING_TRAILER:
412             processTrailer();
413             buffer.clear();
414             break;
415
416         case STATE_IDLE:
417             SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
418
419         default:
420             break;
421         }
422     }
423
424     bool hasIdleTimeout() const
425     {
426         if (state != STATE_IDLE) {
427             return false;
428         }
429
430         assert(sentRequests.empty());
431         return idleTime.elapsedMSec() > 1000 * 10; // ten seconds
432     }
433
434     bool hasErrorTimeout() const
435     {
436       if (state == STATE_IDLE) {
437         return false;
438       }
439
440       return idleTime.elapsedMSec() > (1000 * 30); // 30 seconds
441     }
442
443     bool hasError() const
444     {
445         return (state == STATE_SOCKET_ERROR);
446     }
447
448     bool shouldStartNext() const
449     {
450       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
451     }
452
453     bool isActive() const
454     {
455         return !queuedRequests.empty() || !sentRequests.empty();
456     }
457 private:
458     bool connectToHost()
459     {
460         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
461
462         if (!open()) {
463             SG_LOG(SG_ALL, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
464             return false;
465         }
466
467         if (connect(host.c_str(), port) != 0) {
468             return false;
469         }
470
471         return true;
472     }
473
474
475     void processHeader()
476     {
477         std::string h = strutils::simplify(buffer);
478         if (h.empty()) { // blank line terminates headers
479             headersComplete();
480             return;
481         }
482
483         int colonPos = buffer.find(':');
484         if (colonPos < 0) {
485             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
486             return;
487         }
488
489         std::string key = strutils::simplify(buffer.substr(0, colonPos));
490         std::string lkey = boost::to_lower_copy(key);
491         std::string value = strutils::strip(buffer.substr(colonPos + 1));
492
493         // only consider these if getting headers (as opposed to trailers
494         // of a chunked transfer)
495         if (state == STATE_GETTING_HEADERS) {
496             if (lkey == "content-length") {
497
498                 int sz = strutils::to_int(value);
499                 if (bodyTransferSize <= 0) {
500                     bodyTransferSize = sz;
501                 }
502                 activeRequest->setResponseLength(sz);
503             } else if (lkey == "transfer-length") {
504                 bodyTransferSize = strutils::to_int(value);
505             } else if (lkey == "transfer-encoding") {
506                 processTransferEncoding(value);
507             } else if (lkey == "content-encoding") {
508                 _contentDecoder.setEncoding(value);
509             }
510         }
511
512         activeRequest->responseHeader(lkey, value);
513     }
514
515     void processTransferEncoding(const std::string& te)
516     {
517         if (te == "chunked") {
518             chunkedTransfer = true;
519         } else {
520             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
521             // failure
522         }
523     }
524
525     void processChunkHeader()
526     {
527         if (buffer.empty()) {
528             // blank line after chunk data
529             return;
530         }
531
532         int chunkSize = 0;
533         int semiPos = buffer.find(';');
534         if (semiPos >= 0) {
535             // extensions ignored for the moment
536             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
537         } else {
538             chunkSize = strutils::to_int(buffer, 16);
539         }
540
541         buffer.clear();
542         if (chunkSize == 0) {  //  trailer start
543             state = STATE_GETTING_TRAILER;
544             return;
545         }
546
547         state = STATE_GETTING_CHUNKED_BYTES;
548         setByteCount(chunkSize);
549     }
550
551     void processTrailer()
552     {
553         if (buffer.empty()) {
554             // end of trailers
555             responseComplete();
556             return;
557         }
558
559     // process as a normal header
560         processHeader();
561     }
562
563     void headersComplete()
564     {
565         activeRequest->responseHeadersComplete();
566         _contentDecoder.initWithRequest(activeRequest);
567
568         if (chunkedTransfer) {
569             state = STATE_GETTING_CHUNKED;
570         } else if (noMessageBody || (bodyTransferSize == 0)) {
571             // force the state to GETTING_BODY, to simplify logic in
572             // responseComplete and handleClose
573             state = STATE_GETTING_BODY;
574             responseComplete();
575         } else {
576             setByteCount(bodyTransferSize); // may be -1, that's fine
577             state = STATE_GETTING_BODY;
578         }
579     }
580
581     void responseComplete()
582     {
583         Request_ptr completedRequest = activeRequest;
584         _contentDecoder.finish();
585
586         assert(sentRequests.front() == activeRequest);
587         sentRequests.pop_front();
588         bool doClose = activeRequest->closeAfterComplete();
589         activeRequest = NULL;
590
591         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
592             if (doClose) {
593           // this will bring us into handleClose() above, which updates
594           // state to STATE_CLOSED
595               close();
596
597           // if we have additional requests waiting, try to start them now
598               tryStartNextRequest();
599             }
600         }
601
602         if (state != STATE_CLOSED)  {
603             state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
604         }
605
606     // notify request after we change state, so this connection is idle
607     // if completion triggers other requests (which is likely)
608         //   SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
609         completedRequest->responseComplete();
610         client->requestFinished(this);
611
612         setTerminator("\r\n");
613     }
614
615     enum ConnectionState {
616         STATE_IDLE = 0,
617         STATE_WAITING_FOR_RESPONSE,
618         STATE_GETTING_HEADERS,
619         STATE_GETTING_BODY,
620         STATE_GETTING_CHUNKED,
621         STATE_GETTING_CHUNKED_BYTES,
622         STATE_GETTING_TRAILER,
623         STATE_SOCKET_ERROR,
624         STATE_CLOSED             ///< connection should be closed now
625     };
626
627     Client* client;
628     Request_ptr activeRequest;
629     ConnectionState state;
630     std::string host;
631     short port;
632     std::string buffer;
633     int bodyTransferSize;
634     SGTimeStamp idleTime;
635     bool chunkedTransfer;
636     bool noMessageBody;
637
638     RequestList queuedRequests;
639     RequestList sentRequests;
640
641     ContentDecoder _contentDecoder;
642 };
643 #endif // of !ENABLE_CURL
644
645 Client::Client() :
646     d(new ClientPrivate)
647 {
648     d->proxyPort = 0;
649     d->maxConnections = 4;
650     d->bytesTransferred = 0;
651     d->lastTransferRate = 0;
652     d->timeTransferSample.stamp();
653     d->totalBytesDownloaded = 0;
654
655     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
656 #if defined(ENABLE_CURL)
657     static bool didInitCurlGlobal = false;
658     if (!didInitCurlGlobal) {
659       curl_global_init(CURL_GLOBAL_ALL);
660       didInitCurlGlobal = true;
661     }
662
663     d->curlMulti = curl_multi_init();
664 #endif
665 }
666
667 Client::~Client()
668 {
669 #if defined(ENABLE_CURL)
670   curl_multi_cleanup(d->curlMulti);
671 #endif
672 }
673
674 void Client::setMaxConnections(unsigned int maxCon)
675 {
676     if (maxCon < 1) {
677         throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
678     }
679
680     d->maxConnections = maxCon;
681 #if defined(ENABLE_CURL)
682     curl_multi_setopt(d->curlMulti, CURLMOPT_MAXCONNECTS, (long) maxCon);
683 #endif
684 }
685
686 void Client::update(int waitTimeout)
687 {
688 #if defined(ENABLE_CURL)
689     int remainingActive, messagesInQueue;
690     curl_multi_perform(d->curlMulti, &remainingActive);
691     d->haveActiveRequests = (remainingActive > 0);
692
693     CURLMsg* msg;
694     while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
695       if (msg->msg == CURLMSG_DONE) {
696         Request* req;
697         CURL *e = msg->easy_handle;
698         curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
699
700         long responseCode;
701         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
702
703         if (msg->data.result == 0) {
704           req->responseComplete();
705         } else {
706           fprintf(stderr, "Result: %d - %s\n",
707                 msg->data.result, curl_easy_strerror(msg->data.result));
708           req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
709         }
710
711         curl_multi_remove_handle(d->curlMulti, e);
712
713         // balance the reference we take in makeRequest
714         SGReferenced::put(req);
715         curl_easy_cleanup(e);
716       }
717       else {
718         SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
719       }
720     } // of curl message processing loop
721 #else
722     if (!d->poller.hasChannels() && (waitTimeout > 0)) {
723         SGTimeStamp::sleepForMSec(waitTimeout);
724     } else {
725         d->poller.poll(waitTimeout);
726     }
727
728     bool waitingRequests = !d->pendingRequests.empty();
729     ConnectionDict::iterator it = d->connections.begin();
730     for (; it != d->connections.end(); ) {
731         Connection* con = it->second;
732         if (con->hasIdleTimeout() ||
733             con->hasError() ||
734             con->hasErrorTimeout() ||
735             (!con->isActive() && waitingRequests))
736         {
737             if (con->hasErrorTimeout()) {
738                 // tell the connection we're timing it out
739                 con->handleTimeout();
740             }
741
742         // connection has been idle for a while, clean it up
743         // (or if we have requests waiting for a different host,
744         // or an error condition
745             ConnectionDict::iterator del = it++;
746             delete del->second;
747             d->connections.erase(del);
748         } else {
749             if (it->second->shouldStartNext()) {
750                 it->second->tryStartNextRequest();
751             }
752             ++it;
753         }
754     } // of connection iteration
755
756     if (waitingRequests && (d->connections.size() < d->maxConnections)) {
757         RequestList waiting(d->pendingRequests);
758         d->pendingRequests.clear();
759
760         // re-submit all waiting requests in order; this takes care of
761         // finding multiple pending items targetted to the same (new)
762         // connection
763         BOOST_FOREACH(Request_ptr req, waiting) {
764             makeRequest(req);
765         }
766     }
767 #endif
768 }
769
770 void Client::makeRequest(const Request_ptr& r)
771 {
772     if( r->isComplete() )
773       return;
774
775     if( r->url().find("://") == std::string::npos ) {
776         r->setFailure(EINVAL, "malformed URL");
777         return;
778     }
779
780 #if defined(ENABLE_CURL)
781     CURL* curlRequest = curl_easy_init();
782     curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
783
784     // manually increase the ref count of the request
785     SGReferenced::get(r.get());
786     curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
787     // disable built-in libCurl progress feedback
788     curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
789
790     curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
791     curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
792     curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
793     curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
794
795     curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
796
797     if (!d->proxy.empty()) {
798       curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
799       curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
800
801       if (!d->proxyAuth.empty()) {
802         curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
803         curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
804       }
805     }
806
807     std::string method = boost::to_lower_copy(r->method());
808     if (method == "get") {
809       curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
810     } else if (method == "put") {
811       curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
812       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
813     } else if (method == "post") {
814       // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
815       curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
816
817       std::string q = r->query().substr(1);
818       curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
819
820       // reset URL to exclude query pieces
821       std::string urlWithoutQuery = r->url();
822       std::string::size_type queryPos = urlWithoutQuery.find('?');
823       urlWithoutQuery.resize(queryPos);
824       curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
825     } else {
826       curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
827     }
828
829     struct curl_slist* headerList = NULL;
830     if (r->hasBodyData() && (method != "post")) {
831       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
832       curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
833       curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
834       curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
835       std::string h = "Content-Type:" + r->bodyType();
836       headerList = curl_slist_append(headerList, h.c_str());
837     }
838
839     StringMap::const_iterator it;
840     for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
841       std::string h = it->first + ": " + it->second;
842       headerList = curl_slist_append(headerList, h.c_str());
843     }
844
845     if (headerList != NULL) {
846       curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
847     }
848
849     curl_multi_add_handle(d->curlMulti, curlRequest);
850     d->haveActiveRequests = true;
851
852 // FIXME - premature?
853     r->requestStart();
854
855 #else
856     if( r->url().find("http://") != 0 ) {
857         r->setFailure(EINVAL, "only HTTP protocol is supported");
858         return;
859     }
860
861     std::string host = r->host();
862     int port = r->port();
863     if (!d->proxy.empty()) {
864         host = d->proxy;
865         port = d->proxyPort;
866     }
867
868     Connection* con = NULL;
869     std::stringstream ss;
870     ss << host << "-" << port;
871     std::string connectionId = ss.str();
872     bool havePending = !d->pendingRequests.empty();
873     bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
874     ConnectionDict::iterator consEnd = d->connections.end();
875
876     // assign request to an existing Connection.
877     // various options exist here, examined in order
878     ConnectionDict::iterator it = d->connections.find(connectionId);
879     if (atConnectionsLimit && (it == consEnd)) {
880         // maximum number of connections active, queue this request
881         // when a connection goes inactive, we'll start this one
882         d->pendingRequests.push_back(r);
883         return;
884     }
885
886     // scan for an idle Connection to the same host (likely if we're
887     // retrieving multiple resources from the same host in quick succession)
888     // if we have pending requests (waiting for a free Connection), then
889     // force new requests on this id to always use the first Connection
890     // (instead of the random selection below). This ensures that when
891     // there's pressure on the number of connections to keep alive, one
892     // host can't DoS every other.
893     int count = 0;
894     for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
895         if (havePending || !it->second->isActive()) {
896             con = it->second;
897             break;
898         }
899     }
900
901     if (!con && atConnectionsLimit) {
902         // all current connections are busy (active), and we don't
903         // have free connections to allocate, so let's assign to
904         // an existing one randomly. Ideally we'd used whichever one will
905         // complete first but we don't have that info.
906         int index = rand() % count;
907         for (it = d->connections.find(connectionId); index > 0; --index) { ; }
908         con = it->second;
909     }
910
911     // allocate a new connection object
912     if (!con) {
913         con = new Connection(this);
914         con->setServer(host, port);
915         d->poller.addChannel(con);
916         d->connections.insert(d->connections.end(),
917             ConnectionDict::value_type(connectionId, con));
918     }
919
920     con->queueRequest(r);
921 #endif
922 }
923
924 //------------------------------------------------------------------------------
925 FileRequestRef Client::save( const std::string& url,
926                              const std::string& filename )
927 {
928   FileRequestRef req = new FileRequest(url, filename);
929   makeRequest(req);
930   return req;
931 }
932
933 //------------------------------------------------------------------------------
934 MemoryRequestRef Client::load(const std::string& url)
935 {
936   MemoryRequestRef req = new MemoryRequest(url);
937   makeRequest(req);
938   return req;
939 }
940
941 void Client::requestFinished(Connection* con)
942 {
943
944 }
945
946 void Client::setUserAgent(const std::string& ua)
947 {
948     d->userAgent = ua;
949 }
950
951 const std::string& Client::userAgent() const
952 {
953     return d->userAgent;
954 }
955
956 const std::string& Client::proxyHost() const
957 {
958     return d->proxy;
959 }
960
961 const std::string& Client::proxyAuth() const
962 {
963     return d->proxyAuth;
964 }
965
966 void Client::setProxy( const std::string& proxy,
967                        int port,
968                        const std::string& auth )
969 {
970     d->proxy = proxy;
971     d->proxyPort = port;
972     d->proxyAuth = auth;
973 }
974
975 bool Client::hasActiveRequests() const
976 {
977   #if defined(ENABLE_CURL)
978     return d->haveActiveRequests;
979   #else
980     ConnectionDict::const_iterator it = d->connections.begin();
981     for (; it != d->connections.end(); ++it) {
982         if (it->second->isActive()) return true;
983     }
984
985     return false;
986 #endif
987 }
988
989 void Client::receivedBytes(unsigned int count)
990 {
991     d->bytesTransferred += count;
992     d->totalBytesDownloaded += count;
993 }
994
995 unsigned int Client::transferRateBytesPerSec() const
996 {
997     unsigned int e = d->timeTransferSample.elapsedMSec();
998     if (e > 400) {
999         // too long a window, ignore
1000         d->timeTransferSample.stamp();
1001         d->bytesTransferred = 0;
1002         d->lastTransferRate = 0;
1003         return 0;
1004     }
1005
1006     if (e < 100) { // avoid really narrow windows
1007         return d->lastTransferRate;
1008     }
1009
1010     unsigned int ratio = (d->bytesTransferred * 1000) / e;
1011     // run a low-pass filter
1012     unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1013     smoothed /= 400;
1014
1015     d->timeTransferSample.stamp();
1016     d->bytesTransferred = 0;
1017     d->lastTransferRate = smoothed;
1018     return smoothed;
1019 }
1020
1021 uint64_t Client::totalBytesDownloaded() const
1022 {
1023     return d->totalBytesDownloaded;
1024 }
1025
1026 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1027 {
1028   size_t byteSize = size * nmemb;
1029
1030   Request* req = static_cast<Request*>(userdata);
1031   req->processBodyBytes(ptr, byteSize);
1032   return byteSize;
1033 }
1034
1035 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1036 {
1037   size_t maxBytes = size * nmemb;
1038   Request* req = static_cast<Request*>(userdata);
1039   size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1040   return actualBytes;
1041 }
1042
1043 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1044 {
1045   size_t byteSize = size * nitems;
1046   Request* req = static_cast<Request*>(userdata);
1047   std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1048
1049   if (req->readyState() == HTTP::Request::OPENED) {
1050     req->responseStart(h);
1051     return byteSize;
1052   }
1053
1054   if (h.empty()) {
1055       // got a 100-continue reponse; restart
1056       if (req->responseCode() == 100) {
1057           req->setReadyState(HTTP::Request::OPENED);
1058           return byteSize;
1059       }
1060
1061     req->responseHeadersComplete();
1062     return byteSize;
1063   }
1064
1065   if (req->responseCode() == 100) {
1066       return byteSize; // skip headers associated with 100-continue status
1067   }
1068
1069   int colonPos = h.find(':');
1070   if (colonPos == std::string::npos) {
1071       SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1072       return byteSize;
1073   }
1074
1075   std::string key = strutils::simplify(h.substr(0, colonPos));
1076   std::string lkey = boost::to_lower_copy(key);
1077   std::string value = strutils::strip(h.substr(colonPos + 1));
1078
1079   req->responseHeader(lkey, value);
1080   return byteSize;
1081 }
1082
1083 } // of namespace HTTP
1084
1085 } // of namespace simgear