Ben
Ben

Reputation: 7802

C++ Equivalent to Java's BlockingQueue

I'm in the process of porting some Java code over to C++, and one particular section makes use of a BlockingQueue to pass messages from many producers to a single consumer.

If you are not familiar with what a Java BlockingQueue is, it is just a queue that has a hard capacity, which exposes thread safe methods to put() and take() from the queue. put() blocks if the queue is full, and take() blocks if the queue is empty. Also, timeout-sensitive versions of these methods are supplied.

Timeouts are relevant to my use-case, so a recommendation that supplies those is ideal. If not, I can code up some myself.

I've googled around and quickly browsed the Boost libraries and I'm not finding anything like this. Maybe I'm blind here...but does anyone know of a good recommendation?

Thanks!

Upvotes: 53

Views: 35565

Answers (4)

U should write the class of semephore first

#ifndef SEMEPHORE_H
#define SEMEPHORE_H
#include <mutex>
#include <condition_variable>

class semephore {
public:
    semephore(int count = 0)
        : count(count),
          m(),
          cv()
    {

    }

    void await() {
        std::unique_lock<std::mutex> lk(m);
        --count;
        if (count < 0) {
            cv.wait(lk);
        }
    }

    void post() {
        std::unique_lock<std::mutex> lk(m);
        ++count;
        if (count <= 0) {
            cv.notify_all();
        }
    }
    
private:
    int count;
    std::mutex m;
    std::condition_variable cv;
};

#endif // SEMEPHORE_H

now the blocked_queue can use the semephore to deal with it

#ifndef BLOCKED_QUEUE_H
#define BLOCKED_QUEUE_H
#include <list>
#include "semephore.h"

template <typename T>
class blocked_queue {
public:
    blocked_queue(int count) 
        : s_products(),
          s_free_space(count),
          li()
    {

    }

    void put(const T &t) {
        s_free_space.await();
        li.push_back(t);
        s_products.post();
    }

    T take() {
        s_products.await();
        T res = li.front();
        li.pop_front();
        s_free_space.post();
        return res;
    }
private:
    semephore s_products;
    semephore s_free_space;
    std::list<T> li;
};

#endif // BLOCKED_QUEUE_H

Upvotes: 1

SkyWalker
SkyWalker

Reputation: 14309

OK I'm a bit late to the party but I think this is a better fit for the Java's BlockingQueue implementation. Here I too use one mutex and two conditions to look after not full and not empty. IMO a BlockingQueue makes more sense with limited capacity which I didn't see in the other answers. I include a simple test scenario too:

#include <iostream>
#include <algorithm>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>

template<typename T>
class blocking_queue {
private:
    size_t _capacity;
    std::queue<T> _queue;
    std::mutex _mutex;
    std::condition_variable _not_full;
    std::condition_variable _not_empty;

public:
    inline blocking_queue(size_t capacity) : _capacity(capacity) {
        // empty
    }

    inline size_t size() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.size();
    }

    inline bool empty() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.empty();
    }

    inline void push(const T& elem) {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is full
            while (_queue.size() >= _capacity) {
                _not_full.wait(lock);
            }
            std::cout << "pushing element " << elem << std::endl;
            _queue.push(elem);
        }
        _not_empty.notify_all();
    }

    inline void pop() {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is empty
            while (_queue.size() == 0) {
                _not_empty.wait(lock);
            }
            std::cout << "popping element " << _queue.front() << std::endl;
            _queue.pop();
        }
        _not_full.notify_one();
    }

    inline const T& front() {
        std::unique_lock<std::mutex> lock(_mutex);

        // wait while the queue is empty
        while (_queue.size() == 0) {
            _not_empty.wait(lock);
        }
        return _queue.front();
    }
};

int main() {
    blocking_queue<int> queue(5);

    // create producers
    std::vector<std::thread> producers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.push(i);
            // produces too fast
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }));
    }

    // create consumers
    std::vector<std::thread> consumers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.pop();
            // consumes too slowly
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }));
    }

    std::for_each(producers.begin(), producers.end(), [](std::thread &thread) {
        thread.join();
    });

    std::for_each(consumers.begin(), consumers.end(), [](std::thread &thread) {
        thread.join();
    });

    return EXIT_SUCCESS;
}

Upvotes: 0

Serge Rogatch
Serge Rogatch

Reputation: 15020

Here's an example of a blocking queue with shutdown request feature:

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};

Upvotes: 6

Dietmar K&#252;hl
Dietmar K&#252;hl

Reputation: 153792

It isn't fixed size and it doesn't support timeouts but here is a simple implementation of a queue I had posted recently using C++ 2011 constructs:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

It should be trivial to extend and use a timed wait for popping. The main reason I haven't done it is that I'm not happy with the interface choices I have thought of so far.

Upvotes: 73

Related Questions