Pages

Tuesday, November 13, 2012

I recently had to build my own blocking queue for a Qt project which I'm developing in Visual Studio 2010.

The BlockingQueue is designed to take advantage of the Qt library. In particular I utilize the QWaitCondition and QMutex for signalling. The underlying queue uses QSharedPointer to store the items in queue, but some of you may want to use the QSharedDataPointer instead (the decision as to why is up to you).

Anyway, I thought I'd share it with all of you:

#ifndef BLOCKING_QUEUE_H
#define BLOCKING_QUEUE_H

#include <QObject>
#include <QSharedPointer>
#include <QWaitCondition>
#include <QMutex>
#include <queue>

namespace Concurrency
{
    template<typename Data>
    class BlockingQueue
    {
    private:
        QMutex _mutex;
        QWaitCondition _monitor;
        volatile bool _closed;
        std::queue<QSharedPointer<Data>> _queue;

    public:
        BlockingQueue()
        {
            _closed = false;
        }

        ~BlockingQueue()
        {
        }

        void Close()
        {
            QMutexLocker locker(&_mutex);
            if(!_closed)
            {
                _closed = true;
                _monitor.wakeAll();
            }
        }

        size_t Size()
        {
            QMutexLocker locker(&_mutex);
            return _queue.size();
        }

        bool IsClosed()
        {
            return _closed;
        }
    
        bool Enqueue(QSharedPointer<Data> data)
        {
            QMutexLocker locker(&_mutex);

            // Make sure that the queue is not closed
            if(_closed)
            {
                return false;
            }

            _queue.push(data);
            
            // Signal all the waiting threads
            if(queue.size()==1)
            {
                _monitor.wakeAll();
            }

            return true;
        }
    
        bool TryDequeue(QSharedPointer<Data>& value, unsigned long time = ULONG_MAX)
        {
            QMutexLocker locker(&_mutex);

            // Block until something goes into the queue
            // or until the queue is closed
            while(_queue.empty())
            {
                if(_closed || !_monitor.wait(&_mutex, time))
                {
                    return false;
                }
            }
            
            if(!_closed)
            {
                // Dequeue the next item from the queue
                value = _queue.front();
                _queue.pop();
                return true;
            }
            else
            {
                return false;
            }
        }
    };

#include "BlockingQueue.cpp"
}
#endif //BLOCKING_QUEUE_H
There are some issues with closing the queue at the moment. Working to fix it.