Queue
Queue is a first-in-first-out data structure. It is defined by two basic operations: insert a new item, and remove an item.

title

Based on stack FIFO rule when a new element is added, it is always placed to end of the queue. Remove method delete the first element from the queue. The C++ code below gives a simple insight to a queue implementation. It is a dynamic implementation based on pointer arithmetics. The inner structure called Node, holds the value and a pointer to the next node. The queue wrap around the nodes and it provides methods for adding and removing element. The queue has a unique Node pointer called head for accessing the first element of the queue.

In [1]:
#include <stdexcept>
    
    namespace DataStructure {
    
        template<typename T>
        class Queue {
        public:
            Queue() = default;
            virtual ~Queue();
            void in(T const&);
            void in(T&&);
            auto out() -> T;
            auto first() const -> T;
            bool isEmpty() const;
    
            void print() const;
        private:
            struct Node {
                T value;
                Node* pNext;
                
                Node() : value(0), pNext(nullptr) {}
                Node(const T& _value) : value(_value), pNext(nullptr) {}
                Node(const T& _value, Node* _pNext) : value(_value), pNext(_pNext) {}
                Node(T&& _value) : value(std::move(_value)), pNext(nullptr) {}
            };
    
            Node* pHead = nullptr;
            Node* pTail = nullptr;
        };
    };
    

As the stack the queue also has destructor and isEmpty method for deleting the queue and check if the queue is empty or not.

In [2]:
template<typename T>
    DataStructure::Queue<T>::~Queue() {
        while (!isEmpty())
            out();
    }
    
    template<typename T>
    bool DataStructure::Queue<T>::isEmpty() const {
        return pHead == nullptr;
    }
    

The in methods add a new element to the tail of the queue. The second in method performs with move semantic.

In [3]:
template<typename T>
    void DataStructure::Queue<T>::in(T const& _item) {
        Node* p = new Node(_item);
        if (isEmpty()) {
            pTail = p;
            pHead = p;
        }
        else {
            pTail->pNext = p;
            pTail = p;
        }
    }
    
    template<typename  T>
    void DataStructure::Queue<T>::in(T&& _item) {
        Node* p = new Node(std::forward<T>(_item));
        if (isEmpty()) {
            pTail = p;
            pHead = p;
        }
        else {
            pTail->pNext = p;
            pTail = p;
        }
    }
    

The first method returns back with the first value while the out method returns and also removes the first element. The in methods refresh the tail pointer and the out methods refreshes the head pointer.

In [4]:
template<typename T>
    auto DataStructure::Queue<T>::out() -> T {
        if (isEmpty())
            throw std::runtime_error("UnderFlowException");
    
        T tmp = pHead->value;
        Node* p = pHead;
        pHead = pHead->pNext;
        delete p;
        return tmp;
    }
    
    template<typename T>
    auto DataStructure::Queue<T>::first() const -> T{
        if (isEmpty())
            throw std::runtime_error("UnderFlowException");
    
        return pHead->value;
    }
    

The print method is not part of the queue it is only for printing the data.

In [ ]:
template<typename T>
    void DataStructure::Queue<T>::print() const {
        for (Node* i = pHead; i != nullptr; i = i->pNext)
            std::cout << i->value << " ";
        std::cout << "\n\n";
    }
    

Similarly the stack the thread safe queue is just a wrapper and provides a push and pop method for parallel operating.

In [ ]:
#include <thread>
    #include <mutex>
    #include <condition_variable>
    
    namespace DataStructure {
        template <typename T>
        class ThreadSafeQueue {
        public:
            T out();
            void in(const T&);
    
            ThreadSafeQueue() = default;
            ThreadSafeQueue(const ThreadSafeQueue&) = delete;            // disable copying
            ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete; // disable assignment
        private:
            DataStructure::Queue<T> queue_;
            std::mutex mutex_;
            std::condition_variable cond_;
        };
    };
    
    template <typename T>
    T DataStructure::ThreadSafeQueue<T>::out() {
        std::unique_lock<std::mutex> mlock(mutex_);
        while (queue_.isEmpty()) {
            cond_.wait(mlock);
        }
        auto val = queue_.first();
        queue_.out();
        return val;
    }
    
    template <typename T>
    void DataStructure::ThreadSafeQueue<T>::in(const T& item) {
        std::unique_lock<std::mutex> mlock(mutex_);
        queue_.in(item);
        mlock.unlock();
        cond_.notify_one();
    }
    

In the main I demonstrate the move semantic and the parallel working of the queue.

In [ ]:
#include <iostream>
    #include <random>
    #include <functional>
    #include <ctime>
    #include <exception>
    
    void produce(DataStructure::ThreadSafeQueue<int>& q) {
        for (int i = 0; i< 1000; ++i) {
            std::cout << "Pushing " << i << "\n";
            q.in(i);
        }
    }
    
    void consume(DataStructure::ThreadSafeQueue<int>& q, unsigned int id) {
        for (int i = 0; i< 250; ++i) {
            auto item = q.out();
            std::cout << "Consumer " << id << " popped " << item << "\n";
        }
    }
    
In [ ]:
std::cout << "Testing data structure Queue ...\n\n";
    
    ////////// Using random generator engine //////////
    std::default_random_engine gen((unsigned int)std::time(0));
    std::uniform_int_distribution<int> distribution(1, 50);
    auto randInt = std::bind(distribution, gen); // using randInt() generate integer number between (1-50)
    
    DataStructure::Queue<int> queue;
    
    int size = randInt();
    for (auto i = 0; i < size; ++i)
        queue.in(randInt()); // call in(T&& _item) (C++11 move semantic) | pass rvalue arguments as rvalues
    
    int tmp = 10;
    queue.in(tmp); // call in(T const& _item) | pass lvalue arguments as lvalues
    
    queue.print();
    
    std::cout << "Out:  " << queue.out() << "\n";
    std::cout << "Out:  " << queue.out() << "\n\n";
    queue.print();
    
    while (!queue.isEmpty())
        queue.out();
    try {
        queue.out(); // exception UnderFlow
    }
    catch (std::exception& e) {
        std::cerr << e.what() << "\n";
    }
    
    ////////// Thread safe queue //////////
    
    DataStructure::ThreadSafeQueue<int> q;
    
    using namespace std::placeholders;
    
    // producer thread
    std::thread prod1(std::bind(produce, std::ref(q)));
    
    // consumer threads
    std::thread consumer1(std::bind(&consume, std::ref(q), 1));
    std::thread consumer2(std::bind(&consume, std::ref(q), 2));
    std::thread consumer3(std::bind(&consume, std::ref(q), 3));
    std::thread consumer4(std::bind(&consume, std::ref(q), 4));
    
    prod1.join();
    consumer1.join();
    consumer2.join();
    consumer3.join();
    consumer4.join();