]> git.mxchange.org Git - simgear.git/blob - simgear/io/HTTPClient.cxx
26ebcb66ebe86617f193f74b68aae113dad0f051
[simgear.git] / simgear / io / HTTPClient.cxx
1 /**
2  * \file HTTPClient.cxx - simple HTTP client engine for SimHear
3  */
4
5 // Written by James Turner
6 //
7 // Copyright (C) 2013  James Turner  <zakalawe@mac.com>
8 //
9 // This library is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Library General Public
11 // License as published by the Free Software Foundation; either
12 // version 2 of the License, or (at your option) any later version.
13 //
14 // This library is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 // Library General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program; if not, write to the Free Software
21 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
22 //
23
24
25 #include "HTTPClient.hxx"
26 #include "HTTPFileRequest.hxx"
27
28 #include <sstream>
29 #include <cassert>
30 #include <cstdlib> // rand()
31 #include <list>
32 #include <errno.h>
33 #include <map>
34 #include <stdexcept>
35
36 #include <boost/foreach.hpp>
37 #include <boost/algorithm/string/case_conv.hpp>
38
39 #include <simgear/simgear_config.h>
40
41 #if defined(ENABLE_CURL)
42   #include <curl/multi.h>
43 #else
44     #include <simgear/io/HTTPContentDecode.hxx>
45 #endif
46
47 #include <simgear/io/sg_netChat.hxx>
48
49 #include <simgear/misc/strutils.hxx>
50 #include <simgear/compiler.h>
51 #include <simgear/debug/logstream.hxx>
52 #include <simgear/timing/timestamp.hxx>
53 #include <simgear/structure/exception.hxx>
54
55 #if defined( HAVE_VERSION_H ) && HAVE_VERSION_H
56 #include "version.h"
57 #else
58 #  if !defined(SIMGEAR_VERSION)
59 #    define SIMGEAR_VERSION "simgear-development"
60 #  endif
61 #endif
62
63 namespace simgear
64 {
65
66 namespace HTTP
67 {
68
69 extern const int DEFAULT_HTTP_PORT = 80;
70 const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
71 const unsigned int MAX_INFLIGHT_REQUESTS = 32;
72
73 class Connection;
74 typedef std::multimap<std::string, Connection*> ConnectionDict;
75 typedef std::list<Request_ptr> RequestList;
76
77 class Client::ClientPrivate
78 {
79 public:
80 #if defined(ENABLE_CURL)
81     CURLM* curlMulti;
82     bool haveActiveRequests;
83 #else
84     NetChannelPoller poller;
85 // connections by host (potentially more than one)
86     ConnectionDict connections;
87 #endif
88
89     std::string userAgent;
90     std::string proxy;
91     int proxyPort;
92     std::string proxyAuth;
93     unsigned int maxConnections;
94
95     RequestList pendingRequests;
96
97
98
99     SGTimeStamp timeTransferSample;
100     unsigned int bytesTransferred;
101     unsigned int lastTransferRate;
102     uint64_t totalBytesDownloaded;
103 };
104
105 #if !defined(ENABLE_CURL)
106 class Connection : public NetChat
107 {
108 public:
109     Connection(Client* pr, const std::string& conId) :
110         client(pr),
111         state(STATE_CLOSED),
112         port(DEFAULT_HTTP_PORT),
113         connectionId(conId)
114     {
115     }
116
117     virtual ~Connection()
118     {
119     }
120
121     virtual void handleBufferRead (NetBuffer& buffer)
122     {
123       if( !activeRequest || !activeRequest->isComplete() )
124         return NetChat::handleBufferRead(buffer);
125
126       // Request should be aborted (signaled by setting its state to complete).
127
128       // force the state to GETTING_BODY, to simplify logic in
129       // responseComplete and handleClose
130       state = STATE_GETTING_BODY;
131       responseComplete();
132     }
133
134     void setServer(const std::string& h, short p)
135     {
136         host = h;
137         port = p;
138     }
139
140     // socket-level errors
141     virtual void handleError(int error)
142     {
143         const char* errStr = strerror(error);
144         SG_LOG(SG_IO, SG_WARN, "HTTP Connection handleError:" << error << " ("
145                << errStr << ")");
146
147         debugDumpRequests();
148
149         if (!activeRequest)
150         {
151         // connection level failure, eg name lookup or routing
152         // we won't have an active request yet, so let's fail all of the
153         // requests since we presume it's a systematic failure for
154         // the host in question
155             BOOST_FOREACH(Request_ptr req, sentRequests) {
156                 req->setFailure(error, errStr);
157             }
158
159             BOOST_FOREACH(Request_ptr req, queuedRequests) {
160                 req->setFailure(error, errStr);
161             }
162
163             sentRequests.clear();
164             queuedRequests.clear();
165         }
166
167         NetChat::handleError(error);
168         if (activeRequest) {
169             activeRequest->setFailure(error, errStr);
170             activeRequest = NULL;
171             _contentDecoder.reset();
172         }
173
174         state = STATE_SOCKET_ERROR;
175     }
176
177     void handleTimeout()
178     {
179         handleError(ETIMEDOUT);
180     }
181
182     virtual void handleClose()
183     {
184         NetChat::handleClose();
185
186     // closing of the connection from the server side when getting the body,
187         bool canCloseState = (state == STATE_GETTING_BODY);
188         if (canCloseState && activeRequest) {
189         // force state here, so responseComplete can avoid closing the
190         // socket again
191             state =  STATE_CLOSED;
192             responseComplete();
193         } else {
194             if (state == STATE_WAITING_FOR_RESPONSE) {
195                 assert(!sentRequests.empty());
196                 sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
197                 // no active request, but don't restore the front sent one
198                 sentRequests.erase(sentRequests.begin());
199             }
200
201             if (activeRequest) {
202                 activeRequest->setFailure(500, "server closed connection");
203                 // remove the failed request from sentRequests, so it does
204                 // not get restored
205                 RequestList::iterator it = std::find(sentRequests.begin(),
206                     sentRequests.end(), activeRequest);
207                 if (it != sentRequests.end()) {
208                     sentRequests.erase(it);
209                 }
210                 activeRequest = NULL;
211                 _contentDecoder.reset();
212             }
213
214             state = STATE_CLOSED;
215         }
216
217       if (sentRequests.empty()) {
218         return;
219       }
220
221     // restore sent requests to the queue, so they will be re-sent
222     // when the connection opens again
223       queuedRequests.insert(queuedRequests.begin(),
224                               sentRequests.begin(), sentRequests.end());
225       sentRequests.clear();
226     }
227
228     void queueRequest(const Request_ptr& r)
229     {
230         queuedRequests.push_back(r);
231         tryStartNextRequest();
232     }
233
234     void beginResponse()
235     {
236         assert(!sentRequests.empty());
237         assert(state == STATE_WAITING_FOR_RESPONSE);
238
239         activeRequest = sentRequests.front();
240         try {
241             activeRequest->responseStart(buffer);
242         } catch (sg_exception& e) {
243             handleError(EIO);
244             return;
245         }
246
247       state = STATE_GETTING_HEADERS;
248       buffer.clear();
249       if (activeRequest->responseCode() == 204) {
250         noMessageBody = true;
251       } else if (activeRequest->method() == "HEAD") {
252         noMessageBody = true;
253       } else {
254         noMessageBody = false;
255       }
256
257       bodyTransferSize = -1;
258       chunkedTransfer = false;
259       _contentDecoder.reset();
260     }
261
262     void tryStartNextRequest()
263     {
264       while( !queuedRequests.empty()
265           && queuedRequests.front()->isComplete() )
266         queuedRequests.pop_front();
267
268       if (queuedRequests.empty()) {
269         idleTime.stamp();
270         return;
271       }
272
273       if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
274         return;
275       }
276
277       if (state == STATE_CLOSED) {
278           if (!connectToHost()) {
279
280               return;
281           }
282
283           setTerminator("\r\n");
284           state = STATE_IDLE;
285       }
286
287       Request_ptr r = queuedRequests.front();
288       r->requestStart();
289
290       std::stringstream headerData;
291       std::string path = r->path();
292       assert(!path.empty());
293       std::string query = r->query();
294       std::string bodyData;
295
296       if (!client->proxyHost().empty()) {
297           path = r->scheme() + "://" + r->host() + r->path();
298       }
299
300       if (r->bodyType() == CONTENT_TYPE_URL_ENCODED) {
301           headerData << r->method() << " " << path << " HTTP/1.1\r\n";
302           bodyData = query.substr(1); // URL-encode, drop the leading '?'
303           headerData << "Content-Type:" << CONTENT_TYPE_URL_ENCODED << "\r\n";
304           headerData << "Content-Length:" << bodyData.size() << "\r\n";
305       } else {
306           headerData << r->method() << " " << path << query << " HTTP/1.1\r\n";
307           if( r->hasBodyData() )
308           {
309             headerData << "Content-Length:" << r->bodyLength() << "\r\n";
310             headerData << "Content-Type:" << r->bodyType() << "\r\n";
311           }
312       }
313
314       headerData << "Host: " << r->hostAndPort() << "\r\n";
315       headerData << "User-Agent:" << client->userAgent() << "\r\n";
316       headerData << "Accept-Encoding: deflate, gzip\r\n";
317       if (!client->proxyAuth().empty()) {
318           headerData << "Proxy-Authorization: " << client->proxyAuth() << "\r\n";
319       }
320
321       BOOST_FOREACH(const StringMap::value_type& h, r->requestHeaders()) {
322           headerData << h.first << ": " << h.second << "\r\n";
323       }
324
325       headerData << "\r\n"; // final CRLF to terminate the headers
326       if (!bodyData.empty()) {
327           headerData << bodyData;
328       }
329
330       bool ok = push(headerData.str().c_str());
331       if (!ok) {
332           SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
333           // we've over-stuffed the socket, give up for now, let things
334           // drain down before trying to start any more requests.
335           return;
336       }
337
338       if( r->hasBodyData() )
339         for(size_t body_bytes_sent = 0; body_bytes_sent < r->bodyLength();)
340         {
341           char buf[4096];
342           size_t len = r->getBodyData(buf, body_bytes_sent, 4096);
343           if( len )
344           {
345             if( !bufferSend(buf, len) )
346             {
347               SG_LOG(SG_IO,
348                      SG_WARN,
349                      "overflow the HTTP::Connection output buffer");
350               state = STATE_SOCKET_ERROR;
351               return;
352             }
353             body_bytes_sent += len;
354           }
355           else
356           {
357             SG_LOG(SG_IO,
358                    SG_WARN,
359                    "HTTP asynchronous request body generation is unsupported");
360             break;
361           }
362         }
363
364         SG_LOG(SG_IO, SG_DEBUG, "con:" << connectionId << " did start request:" << r->url());
365       // successfully sent, remove from queue, and maybe send the next
366       queuedRequests.pop_front();
367       sentRequests.push_back(r);
368       state = STATE_WAITING_FOR_RESPONSE;
369
370       // pipelining, let's maybe send the next request right away
371       tryStartNextRequest();
372     }
373
374     virtual void collectIncomingData(const char* s, int n)
375     {
376         idleTime.stamp();
377         client->receivedBytes(static_cast<unsigned int>(n));
378
379         if(   (state == STATE_GETTING_BODY)
380            || (state == STATE_GETTING_CHUNKED_BYTES) )
381           _contentDecoder.receivedBytes(s, n);
382         else
383           buffer.append(s, n);
384     }
385
386     virtual void foundTerminator(void)
387     {
388         idleTime.stamp();
389         switch (state) {
390         case STATE_WAITING_FOR_RESPONSE:
391             beginResponse();
392             break;
393
394         case STATE_GETTING_HEADERS:
395             processHeader();
396             buffer.clear();
397             break;
398
399         case STATE_GETTING_BODY:
400             responseComplete();
401             break;
402
403         case STATE_GETTING_CHUNKED:
404             processChunkHeader();
405             break;
406
407         case STATE_GETTING_CHUNKED_BYTES:
408             setTerminator("\r\n");
409             state = STATE_GETTING_CHUNKED;
410             buffer.clear();
411             break;
412
413
414         case STATE_GETTING_TRAILER:
415             processTrailer();
416             buffer.clear();
417             break;
418
419         case STATE_IDLE:
420             SG_LOG(SG_IO, SG_WARN, "HTTP got data in IDLE state, bad server?");
421
422         default:
423             break;
424         }
425     }
426
427     bool hasIdleTimeout() const
428     {
429         if ((state != STATE_IDLE) && (state != STATE_CLOSED)) {
430             return false;
431         }
432
433         assert(sentRequests.empty());
434         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 10)); // 10 seconds
435         return isTimedOut;
436     }
437
438     bool hasErrorTimeout() const
439     {
440       if ((state == STATE_IDLE) || (state == STATE_CLOSED)) {
441         return false;
442       }
443
444         bool isTimedOut = (idleTime.elapsedMSec() > (1000 * 30)); // 30 seconds
445         return isTimedOut;
446     }
447
448     bool hasError() const
449     {
450         return (state == STATE_SOCKET_ERROR);
451     }
452
453     bool shouldStartNext() const
454     {
455       return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
456     }
457
458     bool isActive() const
459     {
460         return !queuedRequests.empty() || !sentRequests.empty();
461     }
462
463     void debugDumpRequests() const
464     {
465         SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << connectionId
466                << "; state=" << state << ")");
467         if (activeRequest) {
468             SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
469         } else {
470             SG_LOG(SG_IO, SG_DEBUG, "\tNo active request");
471         }
472
473         BOOST_FOREACH(Request_ptr req, sentRequests) {
474             SG_LOG(SG_IO, SG_DEBUG, "\tsent:" << req->url());
475         }
476
477         BOOST_FOREACH(Request_ptr req, queuedRequests) {
478             SG_LOG(SG_IO, SG_DEBUG, "\tqueued:" << req->url());
479         }
480     }
481 private:
482     bool connectToHost()
483     {
484         SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
485
486         if (!open()) {
487             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: open() failed");
488             return false;
489         }
490
491         if (connect(host.c_str(), port) != 0) {
492             SG_LOG(SG_IO, SG_WARN, "HTTP::Connection: connectToHost: connect() failed");
493             return false;
494         }
495
496         return true;
497     }
498
499
500     void processHeader()
501     {
502         std::string h = strutils::simplify(buffer);
503         if (h.empty()) { // blank line terminates headers
504             headersComplete();
505             return;
506         }
507
508         int colonPos = buffer.find(':');
509         if (colonPos < 0) {
510             SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
511             return;
512         }
513
514         std::string key = strutils::simplify(buffer.substr(0, colonPos));
515         std::string lkey = boost::to_lower_copy(key);
516         std::string value = strutils::strip(buffer.substr(colonPos + 1));
517
518         // only consider these if getting headers (as opposed to trailers
519         // of a chunked transfer)
520         if (state == STATE_GETTING_HEADERS) {
521             if (lkey == "content-length") {
522
523                 int sz = strutils::to_int(value);
524                 if (bodyTransferSize <= 0) {
525                     bodyTransferSize = sz;
526                 }
527                 activeRequest->setResponseLength(sz);
528             } else if (lkey == "transfer-length") {
529                 bodyTransferSize = strutils::to_int(value);
530             } else if (lkey == "transfer-encoding") {
531                 processTransferEncoding(value);
532             } else if (lkey == "content-encoding") {
533                 _contentDecoder.setEncoding(value);
534             }
535         }
536
537         activeRequest->responseHeader(lkey, value);
538     }
539
540     void processTransferEncoding(const std::string& te)
541     {
542         if (te == "chunked") {
543             chunkedTransfer = true;
544         } else {
545             SG_LOG(SG_IO, SG_WARN, "unsupported transfer encoding:" << te);
546             // failure
547         }
548     }
549
550     void processChunkHeader()
551     {
552         if (buffer.empty()) {
553             // blank line after chunk data
554             return;
555         }
556
557         int chunkSize = 0;
558         int semiPos = buffer.find(';');
559         if (semiPos >= 0) {
560             // extensions ignored for the moment
561             chunkSize = strutils::to_int(buffer.substr(0, semiPos), 16);
562         } else {
563             chunkSize = strutils::to_int(buffer, 16);
564         }
565
566         buffer.clear();
567         if (chunkSize == 0) {  //  trailer start
568             state = STATE_GETTING_TRAILER;
569             return;
570         }
571
572         state = STATE_GETTING_CHUNKED_BYTES;
573         setByteCount(chunkSize);
574     }
575
576     void processTrailer()
577     {
578         if (buffer.empty()) {
579             // end of trailers
580             responseComplete();
581             return;
582         }
583
584     // process as a normal header
585         processHeader();
586     }
587
588     void headersComplete()
589     {
590         activeRequest->responseHeadersComplete();
591         _contentDecoder.initWithRequest(activeRequest);
592
593         if (chunkedTransfer) {
594             state = STATE_GETTING_CHUNKED;
595         } else if (noMessageBody || (bodyTransferSize == 0)) {
596             // force the state to GETTING_BODY, to simplify logic in
597             // responseComplete and handleClose
598             state = STATE_GETTING_BODY;
599             responseComplete();
600         } else {
601             setByteCount(bodyTransferSize); // may be -1, that's fine
602             state = STATE_GETTING_BODY;
603         }
604     }
605
606     void responseComplete()
607     {
608         Request_ptr completedRequest = activeRequest;
609         _contentDecoder.finish();
610
611         assert(sentRequests.front() == activeRequest);
612         sentRequests.pop_front();
613         bool doClose = activeRequest->closeAfterComplete();
614         activeRequest = NULL;
615
616         if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
617             if (doClose) {
618           // this will bring us into handleClose() above, which updates
619           // state to STATE_CLOSED
620               close();
621
622           // if we have additional requests waiting, try to start them now
623               tryStartNextRequest();
624             }
625         }
626
627         if (state != STATE_CLOSED)  {
628             state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
629         }
630
631     // notify request after we change state, so this connection is idle
632     // if completion triggers other requests (which is likely)
633         completedRequest->responseComplete();
634         client->requestFinished(this);
635
636         setTerminator("\r\n");
637     }
638
639     enum ConnectionState {
640         STATE_IDLE = 0,
641         STATE_WAITING_FOR_RESPONSE,
642         STATE_GETTING_HEADERS,
643         STATE_GETTING_BODY,
644         STATE_GETTING_CHUNKED,
645         STATE_GETTING_CHUNKED_BYTES,
646         STATE_GETTING_TRAILER,
647         STATE_SOCKET_ERROR,
648         STATE_CLOSED             ///< connection should be closed now
649     };
650
651     Client* client;
652     Request_ptr activeRequest;
653     ConnectionState state;
654     std::string host;
655     short port;
656     std::string buffer;
657     int bodyTransferSize;
658     SGTimeStamp idleTime;
659     bool chunkedTransfer;
660     bool noMessageBody;
661
662     RequestList queuedRequests;
663     RequestList sentRequests;
664
665     ContentDecoder _contentDecoder;
666     std::string connectionId;
667 };
668 #endif // of !ENABLE_CURL
669
670 Client::Client() :
671     d(new ClientPrivate)
672 {
673     d->proxyPort = 0;
674     d->maxConnections = 4;
675     d->bytesTransferred = 0;
676     d->lastTransferRate = 0;
677     d->timeTransferSample.stamp();
678     d->totalBytesDownloaded = 0;
679
680     setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
681 #if defined(ENABLE_CURL)
682     static bool didInitCurlGlobal = false;
683     if (!didInitCurlGlobal) {
684       curl_global_init(CURL_GLOBAL_ALL);
685       didInitCurlGlobal = true;
686     }
687
688     d->curlMulti = curl_multi_init();
689 #endif
690 }
691
692 Client::~Client()
693 {
694 #if defined(ENABLE_CURL)
695   curl_multi_cleanup(d->curlMulti);
696 #endif
697 }
698
699 void Client::setMaxConnections(unsigned int maxCon)
700 {
701     if (maxCon < 1) {
702         throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
703     }
704
705     d->maxConnections = maxCon;
706 #if defined(ENABLE_CURL)
707     curl_multi_setopt(d->curlMulti, CURLMOPT_MAXCONNECTS, (long) maxCon);
708 #endif
709 }
710
711 void Client::update(int waitTimeout)
712 {
713 #if defined(ENABLE_CURL)
714     int remainingActive, messagesInQueue;
715     curl_multi_perform(d->curlMulti, &remainingActive);
716     d->haveActiveRequests = (remainingActive > 0);
717
718     CURLMsg* msg;
719     while ((msg = curl_multi_info_read(d->curlMulti, &messagesInQueue))) {
720       if (msg->msg == CURLMSG_DONE) {
721         Request* req;
722         CURL *e = msg->easy_handle;
723         curl_easy_getinfo(e, CURLINFO_PRIVATE, &req);
724
725         long responseCode;
726         curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &responseCode);
727
728         if (msg->data.result == 0) {
729           req->responseComplete();
730         } else {
731           fprintf(stderr, "Result: %d - %s\n",
732                 msg->data.result, curl_easy_strerror(msg->data.result));
733           req->setFailure(msg->data.result, curl_easy_strerror(msg->data.result));
734         }
735
736         curl_multi_remove_handle(d->curlMulti, e);
737
738         // balance the reference we take in makeRequest
739         SGReferenced::put(req);
740         curl_easy_cleanup(e);
741       }
742       else {
743         SG_LOG(SG_IO, SG_ALERT, "CurlMSG:" << msg->msg);
744       }
745     } // of curl message processing loop
746 #else
747     if (!d->poller.hasChannels() && (waitTimeout > 0)) {
748         SGTimeStamp::sleepForMSec(waitTimeout);
749     } else {
750         d->poller.poll(waitTimeout);
751     }
752
753     bool waitingRequests = !d->pendingRequests.empty();
754     ConnectionDict::iterator it = d->connections.begin();
755     for (; it != d->connections.end(); ) {
756         Connection* con = it->second;
757         if (con->hasIdleTimeout() ||
758             con->hasError() ||
759             con->hasErrorTimeout() ||
760             (!con->isActive() && waitingRequests))
761         {
762             if (con->hasErrorTimeout()) {
763                 // tell the connection we're timing it out
764                 con->handleTimeout();
765             }
766
767         // connection has been idle for a while, clean it up
768         // (or if we have requests waiting for a different host,
769         // or an error condition
770             ConnectionDict::iterator del = it++;
771             delete del->second;
772             d->connections.erase(del);
773         } else {
774             if (it->second->shouldStartNext()) {
775                 it->second->tryStartNextRequest();
776             }
777             ++it;
778         }
779     } // of connection iteration
780
781     if (waitingRequests && (d->connections.size() < d->maxConnections)) {
782         RequestList waiting(d->pendingRequests);
783         d->pendingRequests.clear();
784
785         // re-submit all waiting requests in order; this takes care of
786         // finding multiple pending items targetted to the same (new)
787         // connection
788         BOOST_FOREACH(Request_ptr req, waiting) {
789             makeRequest(req);
790         }
791     }
792 #endif
793 }
794
795 void Client::makeRequest(const Request_ptr& r)
796 {
797     if( r->isComplete() )
798       return;
799
800     if( r->url().find("://") == std::string::npos ) {
801         r->setFailure(EINVAL, "malformed URL");
802         return;
803     }
804
805 #if defined(ENABLE_CURL)
806     CURL* curlRequest = curl_easy_init();
807     curl_easy_setopt(curlRequest, CURLOPT_URL, r->url().c_str());
808
809     // manually increase the ref count of the request
810     SGReferenced::get(r.get());
811     curl_easy_setopt(curlRequest, CURLOPT_PRIVATE, r.get());
812     // disable built-in libCurl progress feedback
813     curl_easy_setopt(curlRequest, CURLOPT_NOPROGRESS, 1);
814
815     curl_easy_setopt(curlRequest, CURLOPT_WRITEFUNCTION, requestWriteCallback);
816     curl_easy_setopt(curlRequest, CURLOPT_WRITEDATA, r.get());
817     curl_easy_setopt(curlRequest, CURLOPT_HEADERFUNCTION, requestHeaderCallback);
818     curl_easy_setopt(curlRequest, CURLOPT_HEADERDATA, r.get());
819
820     curl_easy_setopt(curlRequest, CURLOPT_USERAGENT, d->userAgent.c_str());
821
822     if (!d->proxy.empty()) {
823       curl_easy_setopt(curlRequest, CURLOPT_PROXY, d->proxy.c_str());
824       curl_easy_setopt(curlRequest, CURLOPT_PROXYPORT, d->proxyPort);
825
826       if (!d->proxyAuth.empty()) {
827         curl_easy_setopt(curlRequest, CURLOPT_PROXYAUTH, CURLAUTH_BASIC);
828         curl_easy_setopt(curlRequest, CURLOPT_PROXYUSERPWD, d->proxyAuth.c_str());
829       }
830     }
831
832     std::string method = boost::to_lower_copy(r->method());
833     if (method == "get") {
834       curl_easy_setopt(curlRequest, CURLOPT_HTTPGET, 1);
835     } else if (method == "put") {
836       curl_easy_setopt(curlRequest, CURLOPT_PUT, 1);
837       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
838     } else if (method == "post") {
839       // see http://curl.haxx.se/libcurl/c/CURLOPT_POST.html
840       curl_easy_setopt(curlRequest, CURLOPT_HTTPPOST, 1);
841
842       std::string q = r->query().substr(1);
843       curl_easy_setopt(curlRequest, CURLOPT_COPYPOSTFIELDS, q.c_str());
844
845       // reset URL to exclude query pieces
846       std::string urlWithoutQuery = r->url();
847       std::string::size_type queryPos = urlWithoutQuery.find('?');
848       urlWithoutQuery.resize(queryPos);
849       curl_easy_setopt(curlRequest, CURLOPT_URL, urlWithoutQuery.c_str());
850     } else {
851       curl_easy_setopt(curlRequest, CURLOPT_CUSTOMREQUEST, r->method().c_str());
852     }
853
854     struct curl_slist* headerList = NULL;
855     if (r->hasBodyData() && (method != "post")) {
856       curl_easy_setopt(curlRequest, CURLOPT_UPLOAD, 1);
857       curl_easy_setopt(curlRequest, CURLOPT_INFILESIZE, r->bodyLength());
858       curl_easy_setopt(curlRequest, CURLOPT_READFUNCTION, requestReadCallback);
859       curl_easy_setopt(curlRequest, CURLOPT_READDATA, r.get());
860       std::string h = "Content-Type:" + r->bodyType();
861       headerList = curl_slist_append(headerList, h.c_str());
862     }
863
864     StringMap::const_iterator it;
865     for (it = r->requestHeaders().begin(); it != r->requestHeaders().end(); ++it) {
866       std::string h = it->first + ": " + it->second;
867       headerList = curl_slist_append(headerList, h.c_str());
868     }
869
870     if (headerList != NULL) {
871       curl_easy_setopt(curlRequest, CURLOPT_HTTPHEADER, headerList);
872     }
873
874     curl_multi_add_handle(d->curlMulti, curlRequest);
875     d->haveActiveRequests = true;
876
877 // FIXME - premature?
878     r->requestStart();
879
880 #else
881     if( r->url().find("http://") != 0 ) {
882         r->setFailure(EINVAL, "only HTTP protocol is supported");
883         return;
884     }
885
886     std::string host = r->host();
887     int port = r->port();
888     if (!d->proxy.empty()) {
889         host = d->proxy;
890         port = d->proxyPort;
891     }
892
893     Connection* con = NULL;
894     std::stringstream ss;
895     ss << host << "-" << port;
896     std::string connectionId = ss.str();
897     bool havePending = !d->pendingRequests.empty();
898     bool atConnectionsLimit = d->connections.size() >= d->maxConnections;
899     ConnectionDict::iterator consEnd = d->connections.end();
900
901     // assign request to an existing Connection.
902     // various options exist here, examined in order
903     ConnectionDict::iterator it = d->connections.find(connectionId);
904     if (atConnectionsLimit && (it == consEnd)) {
905         // maximum number of connections active, queue this request
906         // when a connection goes inactive, we'll start this one
907         d->pendingRequests.push_back(r);
908         return;
909     }
910
911     // scan for an idle Connection to the same host (likely if we're
912     // retrieving multiple resources from the same host in quick succession)
913     // if we have pending requests (waiting for a free Connection), then
914     // force new requests on this id to always use the first Connection
915     // (instead of the random selection below). This ensures that when
916     // there's pressure on the number of connections to keep alive, one
917     // host can't DoS every other.
918     int count = 0;
919     for (; (it != consEnd) && (it->first == connectionId); ++it, ++count) {
920         if (havePending || !it->second->isActive()) {
921             con = it->second;
922             break;
923         }
924     }
925
926     if (!con && atConnectionsLimit) {
927         // all current connections are busy (active), and we don't
928         // have free connections to allocate, so let's assign to
929         // an existing one randomly. Ideally we'd used whichever one will
930         // complete first but we don't have that info.
931         int index = rand() % count;
932         for (it = d->connections.find(connectionId); index > 0; --index) { ; }
933         con = it->second;
934     }
935
936     // allocate a new connection object
937     if (!con) {
938         con = new Connection(this, connectionId);
939         con->setServer(host, port);
940         d->poller.addChannel(con);
941         d->connections.insert(d->connections.end(),
942             ConnectionDict::value_type(connectionId, con));
943     }
944
945     con->queueRequest(r);
946 #endif
947 }
948
949 //------------------------------------------------------------------------------
950 FileRequestRef Client::save( const std::string& url,
951                              const std::string& filename )
952 {
953   FileRequestRef req = new FileRequest(url, filename);
954   makeRequest(req);
955   return req;
956 }
957
958 //------------------------------------------------------------------------------
959 MemoryRequestRef Client::load(const std::string& url)
960 {
961   MemoryRequestRef req = new MemoryRequest(url);
962   makeRequest(req);
963   return req;
964 }
965
966 void Client::requestFinished(Connection* con)
967 {
968
969 }
970
971 void Client::setUserAgent(const std::string& ua)
972 {
973     d->userAgent = ua;
974 }
975
976 const std::string& Client::userAgent() const
977 {
978     return d->userAgent;
979 }
980
981 const std::string& Client::proxyHost() const
982 {
983     return d->proxy;
984 }
985
986 const std::string& Client::proxyAuth() const
987 {
988     return d->proxyAuth;
989 }
990
991 void Client::setProxy( const std::string& proxy,
992                        int port,
993                        const std::string& auth )
994 {
995     d->proxy = proxy;
996     d->proxyPort = port;
997     d->proxyAuth = auth;
998 }
999
1000 bool Client::hasActiveRequests() const
1001 {
1002   #if defined(ENABLE_CURL)
1003     return d->haveActiveRequests;
1004   #else
1005     ConnectionDict::const_iterator it = d->connections.begin();
1006     for (; it != d->connections.end(); ++it) {
1007         if (it->second->isActive()) return true;
1008     }
1009
1010     return false;
1011 #endif
1012 }
1013
1014 void Client::receivedBytes(unsigned int count)
1015 {
1016     d->bytesTransferred += count;
1017     d->totalBytesDownloaded += count;
1018 }
1019
1020 unsigned int Client::transferRateBytesPerSec() const
1021 {
1022     unsigned int e = d->timeTransferSample.elapsedMSec();
1023     if (e > 400) {
1024         // too long a window, ignore
1025         d->timeTransferSample.stamp();
1026         d->bytesTransferred = 0;
1027         d->lastTransferRate = 0;
1028         return 0;
1029     }
1030
1031     if (e < 100) { // avoid really narrow windows
1032         return d->lastTransferRate;
1033     }
1034
1035     unsigned int ratio = (d->bytesTransferred * 1000) / e;
1036     // run a low-pass filter
1037     unsigned int smoothed = ((400 - e) * d->lastTransferRate) + (e * ratio);
1038     smoothed /= 400;
1039
1040     d->timeTransferSample.stamp();
1041     d->bytesTransferred = 0;
1042     d->lastTransferRate = smoothed;
1043     return smoothed;
1044 }
1045
1046 uint64_t Client::totalBytesDownloaded() const
1047 {
1048     return d->totalBytesDownloaded;
1049 }
1050
1051 size_t Client::requestWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1052 {
1053   size_t byteSize = size * nmemb;
1054
1055   Request* req = static_cast<Request*>(userdata);
1056   req->processBodyBytes(ptr, byteSize);
1057   return byteSize;
1058 }
1059
1060 size_t Client::requestReadCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
1061 {
1062   size_t maxBytes = size * nmemb;
1063   Request* req = static_cast<Request*>(userdata);
1064   size_t actualBytes = req->getBodyData(ptr, 0, maxBytes);
1065   return actualBytes;
1066 }
1067
1068 size_t Client::requestHeaderCallback(char *rawBuffer, size_t size, size_t nitems, void *userdata)
1069 {
1070   size_t byteSize = size * nitems;
1071   Request* req = static_cast<Request*>(userdata);
1072   std::string h = strutils::simplify(std::string(rawBuffer, byteSize));
1073
1074   if (req->readyState() == HTTP::Request::OPENED) {
1075     req->responseStart(h);
1076     return byteSize;
1077   }
1078
1079   if (h.empty()) {
1080       // got a 100-continue reponse; restart
1081       if (req->responseCode() == 100) {
1082           req->setReadyState(HTTP::Request::OPENED);
1083           return byteSize;
1084       }
1085
1086     req->responseHeadersComplete();
1087     return byteSize;
1088   }
1089
1090   if (req->responseCode() == 100) {
1091       return byteSize; // skip headers associated with 100-continue status
1092   }
1093
1094   size_t colonPos = h.find(':');
1095   if (colonPos == std::string::npos) {
1096       SG_LOG(SG_IO, SG_WARN, "malformed HTTP response header:" << h);
1097       return byteSize;
1098   }
1099
1100   std::string key = strutils::simplify(h.substr(0, colonPos));
1101   std::string lkey = boost::to_lower_copy(key);
1102   std::string value = strutils::strip(h.substr(colonPos + 1));
1103
1104   req->responseHeader(lkey, value);
1105   return byteSize;
1106 }
1107
1108 void Client::debugDumpRequests()
1109 {
1110 #if defined(ENABLE_CURL)
1111
1112 #else
1113     SG_LOG(SG_IO, SG_INFO, "== HTTP connection dump");
1114     ConnectionDict::iterator it = d->connections.begin();
1115     for (; it != d->connections.end(); ++it) {
1116         it->second->debugDumpRequests();
1117     }
1118     SG_LOG(SG_IO, SG_INFO, "==");
1119 #endif
1120 }
1121
1122 } // of namespace HTTP
1123
1124 } // of namespace simgear