const int GZIP_HEADER_COMMENT = 1 << 4;
const int GZIP_HEADER_CRC = 1 << 1;
+typedef std::list<Request_ptr> RequestList;
+
class Connection : public NetChat
{
public:
virtual void handleClose()
{
NetChat::handleClose();
-
- if ((state == STATE_GETTING_BODY) && activeRequest) {
+
+ // closing of the connection from the server side when getting the body,
+ bool canCloseState = (state == STATE_GETTING_BODY);
+ if (canCloseState && activeRequest) {
// force state here, so responseComplete can avoid closing the
// socket again
state = STATE_CLOSED;
responseComplete();
} else {
+ if (activeRequest) {
+ activeRequest->setFailure(500, "server closed connection");
+ // remove the failed request from sentRequests, so it does
+ // not get restored
+ RequestList::iterator it = std::find(sentRequests.begin(),
+ sentRequests.end(), activeRequest);
+ if (it != sentRequests.end()) {
+ sentRequests.erase(it);
+ }
+ activeRequest = NULL;
+ }
+
state = STATE_CLOSED;
}
}
Request_ptr r = queuedRequests.front();
+ r->requestStart();
requestBodyBytesToSend = r->requestBodyLength();
stringstream headerData;
bool ok = push(headerData.str().c_str());
if (!ok) {
+ SG_LOG(SG_IO, SG_WARN, "HTTPClient: over-stuffed the socket");
// we've over-stuffed the socket, give up for now, let things
// drain down before trying to start any more requests.
return;
while (requestBodyBytesToSend > 0) {
char buf[4096];
- int len = 4096;
- r->getBodyData(buf, len);
+ int len = r->getBodyData(buf, 4096);
if (len > 0) {
requestBodyBytesToSend -= len;
if (!bufferSend(buf, len)) {
state = STATE_SOCKET_ERROR;
return;
}
+ // SG_LOG(SG_IO, SG_INFO, "sent body:\n" << string(buf, len) << "\n%%%%%%%%%");
} else {
- SG_LOG(SG_IO, SG_WARN, "asynchronous request body generation is unsupported");
+ SG_LOG(SG_IO, SG_WARN, "HTTP asynchronous request body generation is unsupported");
break;
}
}
- //std::cout << "did send request:" << r->url() << std::endl;
+ SG_LOG(SG_IO, SG_DEBUG, "did start request:" << r->url() <<
+ "\n\t @ " << reinterpret_cast<void*>(r.ptr()) <<
+ "\n\t on connection " << this);
// successfully sent, remove from queue, and maybe send the next
queuedRequests.pop_front();
sentRequests.push_back(r);
if (result == Z_OK || result == Z_STREAM_END) {
// nothing to do
} else {
- SG_LOG(SG_IO, SG_WARN, "got Zlib error:" << result);
+ SG_LOG(SG_IO, SG_WARN, "HTTP: got Zlib error:" << result);
return;
}
void responseComplete()
{
- //std::cout << "responseComplete:" << activeRequest->url() << std::endl;
+ // SG_LOG(SG_IO, SG_INFO, "*** responseComplete:" << activeRequest->url());
activeRequest->responseComplete();
client->requestFinished(this);
unsigned char* zlibOutputBuffer;
bool contentGZip, contentDeflate;
- std::list<Request_ptr> queuedRequests;
- std::list<Request_ptr> sentRequests;
+ RequestList queuedRequests;
+ RequestList sentRequests;
};
Client::Client()
void Client::update(int waitTimeout)
{
- NetChannel::poll(waitTimeout);
+ _poller.poll(waitTimeout);
ConnectionDict::iterator it = _connections.begin();
for (; it != _connections.end(); ) {
if (_connections.find(connectionId) == _connections.end()) {
Connection* con = new Connection(this);
con->setServer(host, port);
+ _poller.addChannel(con);
_connections[connectionId] = con;
}
#include <map>
#include <simgear/io/HTTPRequest.hxx>
+#include <simgear/io/sg_netChannel.hxx>
namespace simgear
{
std::string _proxy;
int _proxyPort;
std::string _proxyAuth;
+ NetChannelPoller _poller;
// connections by host
typedef std::map<std::string, Connection*> ConnectionDict;
return string();
}
+void Request::requestStart()
+{
+
+}
+
void Request::responseStart(const string& r)
{
const int maxSplit = 2; // HTTP/1.1 nnn reason-string
return "text/plain";
}
-void Request::getBodyData(char*, int& count) const
+int Request::getBodyData(char*, int maxCount) const
{
- count = 0;
- return;
+ return 0;
}
} // of namespace HTTP
/**
* Retrieve the body data bytes. Will be passed the maximum body bytes
- * to return in the buffer, and should update count with the actual number
+ * to return in the buffer, and must return the actual number
* of bytes written.
*/
- virtual void getBodyData(char* s, int& count) const;
+ virtual int getBodyData(char* s, int count) const;
/**
* retrieve the request body content type. Default is text/plain
protected:
Request(const std::string& url, const std::string method = "GET");
+ virtual void requestStart();
virtual void responseStart(const std::string& r);
virtual void responseHeader(const std::string& key, const std::string& value);
virtual void responseHeadersComplete();
namespace simgear {
-static NetChannel* channels = 0 ;
-
NetChannel::NetChannel ()
{
closed = true ;
accepting = false ;
write_blocked = false ;
should_delete = false ;
-
- next_channel = channels ;
- channels = this ;
+ poller = NULL;
}
NetChannel::~NetChannel ()
{
close();
-
- NetChannel* prev = NULL ;
-
- for ( NetChannel* ch = channels; ch != NULL;
- ch = ch -> next_channel )
- {
- if (ch == this)
- {
- ch = ch -> next_channel ;
- if ( prev != NULL )
- prev -> next_channel = ch ;
- else
- channels = ch ;
- next_channel = 0 ;
- break;
- }
- prev = ch ;
+ if (poller) {
+ poller->removeChannel(this);
}
}
}
}
-bool
-NetChannel::poll (unsigned int timeout)
-{
- if (!channels)
- return false ;
-
- enum { MAX_SOCKETS = 256 } ;
- Socket* reads [ MAX_SOCKETS+1 ] ;
- Socket* writes [ MAX_SOCKETS+1 ] ;
- Socket* deletes [ MAX_SOCKETS+1 ] ;
- int nreads = 0 ;
- int nwrites = 0 ;
- int ndeletes = 0 ;
- int nopen = 0 ;
- NetChannel* ch;
- for ( ch = channels; ch != NULL; ch = ch -> next_channel )
- {
- if ( ch -> should_delete )
- {
- assert(ndeletes<MAX_SOCKETS);
- deletes[ndeletes++] = ch ;
- }
- else if ( ! ch -> closed )
- {
- if (ch -> resolving_host )
- {
- ch -> handleResolve();
- continue;
- }
-
- nopen++ ;
- if (ch -> readable()) {
- assert(nreads<MAX_SOCKETS);
- reads[nreads++] = ch ;
- }
- if (ch -> writable()) {
- assert(nwrites<MAX_SOCKETS);
- writes[nwrites++] = ch ;
- }
- }
- }
- reads[nreads] = NULL ;
- writes[nwrites] = NULL ;
- deletes[ndeletes] = NULL ;
-
- int i ;
- for ( i=0; deletes[i]; i++ )
- {
- ch = (NetChannel*)deletes[i];
- delete ch ;
- }
-
- if (!nopen)
- return false ;
- if (!nreads && !nwrites)
- return true ; //hmmm- should we shutdown?
-
- Socket::select (reads, writes, timeout) ;
-
- for ( i=0; reads[i]; i++ )
- {
- ch = (NetChannel*)reads[i];
- if ( ! ch -> closed )
- ch -> handleReadEvent();
- }
-
- for ( i=0; writes[i]; i++ )
- {
- ch = (NetChannel*)writes[i];
- if ( ! ch -> closed )
- ch -> handleWriteEvent();
- }
-
- return true ;
-}
-
-void
-NetChannel::loop (unsigned int timeout)
-{
- while ( poll (timeout) ) ;
-}
-
-
void NetChannel::handleRead (void) {
SG_LOG(SG_IO, SG_WARN, "Network:" << getHandle() << ": unhandled read");
}
}
}
+void
+NetChannelPoller::addChannel(NetChannel* channel)
+{
+ assert(channel);
+ assert(channel->poller == NULL);
+
+ channel->poller = this;
+ channels.push_back(channel);
+}
+
+void
+NetChannelPoller::removeChannel(NetChannel* channel)
+{
+ assert(channel);
+ assert(channel->poller == this);
+ channel->poller = NULL;
+
+ ChannelList::iterator it = channels.begin();
+ for (; it != channels.end(); ++it) {
+ if (*it == channel) {
+ channels.erase(it);
+ return;
+ }
+ }
+}
+
+bool
+NetChannelPoller::poll(unsigned int timeout)
+{
+ if (channels.empty()) {
+ return false;
+ }
+
+ enum { MAX_SOCKETS = 256 } ;
+ Socket* reads [ MAX_SOCKETS+1 ] ;
+ Socket* writes [ MAX_SOCKETS+1 ] ;
+ int nreads = 0 ;
+ int nwrites = 0 ;
+ int nopen = 0 ;
+ NetChannel* ch;
+
+ ChannelList::iterator it = channels.begin();
+ while( it != channels.end() )
+ {
+ NetChannel* ch = *it;
+ if ( ch -> should_delete )
+ {
+ delete ch;
+ it = channels.erase(it);
+ continue;
+ }
+
+ ++it; // we've copied the pointer into ch
+ if ( ch->closed ) {
+ continue;
+ }
+
+ if (ch -> resolving_host )
+ {
+ ch -> handleResolve();
+ continue;
+ }
+
+ nopen++ ;
+ if (ch -> readable()) {
+ assert(nreads<MAX_SOCKETS);
+ reads[nreads++] = ch ;
+ }
+ if (ch -> writable()) {
+ assert(nwrites<MAX_SOCKETS);
+ writes[nwrites++] = ch ;
+ }
+ } // of array-filling pass
+
+ reads[nreads] = NULL ;
+ writes[nwrites] = NULL ;
+
+ if (!nopen)
+ return false ;
+ if (!nreads && !nwrites)
+ return true ; //hmmm- should we shutdown?
+
+ Socket::select (reads, writes, timeout) ;
+
+ for ( int i=0; reads[i]; i++ )
+ {
+ ch = (NetChannel*)reads[i];
+ if ( ! ch -> closed )
+ ch -> handleReadEvent();
+ }
+
+ for ( int i=0; writes[i]; i++ )
+ {
+ ch = (NetChannel*)writes[i];
+ if ( ! ch -> closed )
+ ch -> handleWriteEvent();
+ }
+
+ return true ;
+}
+
+void
+NetChannelPoller::loop (unsigned int timeout)
+{
+ while ( poll (timeout) ) ;
+}
+
+
} // of namespace simgear
#define SG_NET_CHANNEL_H
#include <simgear/io/raw_socket.hxx>
+
#include <string>
+#include <vector>
namespace simgear
{
+class NetChannelPoller;
+
class NetChannel : public Socket
{
bool closed, connected, accepting, write_blocked, should_delete, resolving_host ;
- NetChannel* next_channel ;
std::string host;
int port;
- friend bool netPoll (unsigned int timeout);
-
+ friend class NetChannelPoller;
+ NetChannelPoller* poller;
public:
NetChannel () ;
virtual void handleWrite (void);
virtual void handleAccept (void);
virtual void handleError (int error);
-
- static bool poll (unsigned int timeout = 0 ) ;
- static void loop (unsigned int timeout = 0 ) ;
+
+};
+
+class NetChannelPoller
+{
+ typedef std::vector<NetChannel*> ChannelList;
+ ChannelList channels;
+public:
+ void addChannel(NetChannel* channel);
+ void removeChannel(NetChannel* channel);
+
+ bool poll(unsigned int timeout = 0);
+ void loop(unsigned int timeout = 0);
};
} // of namespace simgear