X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=simgear%2Fthreads%2FSGQueue.hxx;h=bcdda1d35fad5f52650671908c392ea62144f119;hb=b47d1ad5fd8ed111cae99c1f65f5bb65a5371501;hp=532106bf3bdd74e429da907f670d9f8c4f64adda;hpb=b2a4cd488dfcfbf1d02fa41f2dfa5ad39aabb13a;p=simgear.git diff --git a/simgear/threads/SGQueue.hxx b/simgear/threads/SGQueue.hxx index 532106bf..bcdda1d3 100644 --- a/simgear/threads/SGQueue.hxx +++ b/simgear/threads/SGQueue.hxx @@ -3,15 +3,11 @@ #include -#if defined ( SG_HAVE_STD_INCLUDES ) -# include -#else -# include -#endif - +#include #include -#include "SGThread.hxx" -#include "SGGuard.hxx" +#include +#include +#include /** * SGQueue defines an interface for a FIFO. @@ -78,7 +74,7 @@ protected: /** * A simple thread safe queue. All access functions are guarded with a mutex. */ -template +template class SGLockedQueue : public SGQueue { public: @@ -99,7 +95,7 @@ public: * @return bool True if queue is empty, otherwisr false. */ virtual bool empty() { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); return this->fifo.empty(); } @@ -109,7 +105,7 @@ public: * @param T object to add. */ virtual void push( const T& item ) { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); this->fifo.push( item ); } @@ -119,7 +115,7 @@ public: * @return T next available object. */ virtual T front() { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); assert( ! this->fifo.empty() ); T item = this->fifo.front(); return item; @@ -131,7 +127,7 @@ public: * @return T next available object. */ virtual T pop() { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); //if (fifo.empty()) throw NoSuchElementException(); assert( ! this->fifo.empty() ); // if (fifo.empty()) @@ -150,7 +146,7 @@ public: * @return size_t size of queue. */ virtual size_t size() { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); return this->fifo.size(); } @@ -183,13 +179,13 @@ public: /** * Destroy this queue. */ - ~SGBlockingQueue() { mutex.unlock(); } + ~SGBlockingQueue() {} /** * */ virtual bool empty() { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); return this->fifo.empty(); } @@ -199,7 +195,7 @@ public: * @param T object to add. */ virtual void push( const T& item ) { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); this->fifo.push( item ); not_empty.signal(); } @@ -211,7 +207,7 @@ public: * @return T next available object. */ virtual T front() { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); assert(this->fifo.empty() != true); //if (fifo.empty()) throw ?? @@ -227,10 +223,10 @@ public: * @return T next available object. */ virtual T pop() { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); while (this->fifo.empty()) - not_empty.wait(mutex); + not_empty.wait(&mutex); assert(this->fifo.empty() != true); //if (fifo.empty()) throw ?? @@ -246,7 +242,7 @@ public: * @return size_t size of queue. */ virtual size_t size() { - SGGuard g(mutex); + OpenThreads::ScopedLock g(mutex); return this->fifo.size(); } @@ -255,12 +251,12 @@ private: /** * Mutex to serialise access. */ - SGMutex mutex; + OpenThreads::Mutex mutex; /** * Condition to signal when queue not empty. */ - SGPthreadCond not_empty; + OpenThreads::Condition not_empty; private: // Prevent copying. @@ -268,4 +264,148 @@ private: SGBlockingQueue& operator=( const SGBlockingQueue& ); }; + +/** + * A guarded deque blocks threads trying to retrieve items + * when none are available. + */ +template +class SGBlockingDeque +{ +public: + /** + * Create a new SGBlockingDequeue. + */ + SGBlockingDeque() {} + + /** + * Destroy this dequeue. + */ + ~SGBlockingDeque() {} + + /** + * + */ + virtual void clear() { + OpenThreads::ScopedLock g(mutex); + this->queue.clear(); + } + + /** + * + */ + virtual bool empty() { + OpenThreads::ScopedLock g(mutex); + return this->queue.empty(); + } + + /** + * Add an item to the front of the queue. + * + * @param T object to add. + */ + virtual void push_front( const T& item ) { + OpenThreads::ScopedLock g(mutex); + this->queue.push_front( item ); + not_empty.signal(); + } + + /** + * Add an item to the back of the queue. + * + * @param T object to add. + */ + virtual void push_back( const T& item ) { + OpenThreads::ScopedLock g(mutex); + this->queue.push_back( item ); + not_empty.signal(); + } + + /** + * View the item from the head of the queue. + * Calling thread is not suspended + * + * @return T next available object. + */ + virtual T front() { + OpenThreads::ScopedLock g(mutex); + + assert(this->queue.empty() != true); + //if (queue.empty()) throw ?? + + T item = this->queue.front(); + return item; + } + + /** + * Get an item from the head of the queue. + * If no items are available then the calling thread is suspended + * + * @return T next available object. + */ + virtual T pop_front() { + OpenThreads::ScopedLock g(mutex); + + while (this->queue.empty()) + not_empty.wait(&mutex); + + assert(this->queue.empty() != true); + //if (queue.empty()) throw ?? + + T item = this->queue.front(); + this->queue.pop_front(); + return item; + } + + /** + * Get an item from the tail of the queue. + * If no items are available then the calling thread is suspended + * + * @return T next available object. + */ + virtual T pop_back() { + OpenThreads::ScopedLock g(mutex); + + while (this->queue.empty()) + not_empty.wait(&mutex); + + assert(this->queue.empty() != true); + //if (queue.empty()) throw ?? + + T item = this->queue.back(); + this->queue.pop_back(); + return item; + } + + /** + * Query the size of the queue + * + * @return size_t size of queue. + */ + virtual size_t size() { + OpenThreads::ScopedLock g(mutex); + return this->queue.size(); + } + +private: + + /** + * Mutex to serialise access. + */ + OpenThreads::Mutex mutex; + + /** + * Condition to signal when queue not empty. + */ + OpenThreads::Condition not_empty; + +private: + // Prevent copying. + SGBlockingDeque( const SGBlockingDeque& ); + SGBlockingDeque& operator=( const SGBlockingDeque& ); + +protected: + std::deque queue; +}; + #endif // SGQUEUE_HXX_INCLUDED