Reputation: 3
I have this little simple queue where a one task read from a file into the queue and several tasks unpack the content. I works for a while, but eventually crashes because the queue is empty even though I in the line before check that the queue is not empty! (see the comment in the code)
#pragma once
#include <optional>
#include <future>
#include <queue>
template<class T, uint32_t MAX_SIZE>
class spmc_fifo_queue
{
public:
using optional_queue_pair = std::optional<T>;
bool put(optional_queue_pair&& p)
{
if (my_queue.size() >= MAX_SIZE) {
std::unique_lock my_lock(my_mutex);
my_cv_remove.wait(my_lock, [this] { return my_queue.size() < MAX_SIZE; });
}
std::lock_guard<std::mutex> my_lock(my_mutex);
my_queue.push(std::move(p));
my_cv_add.notify_one();
return true;
}
using optional_queue_pair = std::optional<T>;
optional_queue_pair get()
{
if (my_queue.size() == 0) {
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
std::scoped_lock my_lock(my_mutex);
optional_queue_pair ret = std::move(my_queue.front()); //some times my_queue.size() == 0 why????
my_queue.pop();
my_cv_remove.notify_one();
return ret;
}
private:
using optional_queue_pair = std::optional<T>;
std::queue<optional_queue_pair> my_queue;
std::mutex my_mutex;
std::condition_variable my_cv_add;
std::condition_variable my_cv_remove;
};
I have tried to guard from this, but apparently I do not understand the details of the mutex locking!
Upvotes: 0
Views: 177
Reputation: 18090
condition variables may awake multiple threads when you use notify_one
spurious wakeup
you need to check again that the queue is not empty after you lock the mutex, two threads may have awaken and are waiting on the lock, only the first one will find something in the queue, the other will find it empty, you also need to put the whole thing in a while (true)
because the queue is not allowed to fail.
optional_queue_pair get()
{
while (true) // retry until succeed
{
if (my_queue.size() == 0) {
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
std::lock_guard my_lock(my_mutex); // two threads awaken one waiting on lock
if (my_queue.size() != 0) // first thread sees true, second sees false
{
optional_queue_pair ret = std::move(my_queue.front());
my_queue.pop();
my_cv_remove.notify_all(); // because notify_one may not awaken the producer.
return ret;
}
}
}
for production i would probably use something like boost lockfree queue which is multiple producer-multiple consumer lock-free queue and extensively tested instead of rolling your own (slower) implementation of a multi-consumer queue.
Upvotes: 0
Reputation: 138
This is wrong:
if (my_queue.size() == 0) {
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
You should lock the mutex first to check my_queue.size(). It is possible that size return > 0 but at the same time another thread pop from my_queue.
The correct would be:
std::unique_lock my_lock(my_mutex);
if (my_queue.size() == 0) {
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
}
UPD:
Thanks Ahmed AEK, no need in "if". We can do it much simplier:
std::unique_lock my_lock(my_mutex);
my_cv_add.wait(my_lock, [this] { return my_queue.size() != 0; });
See documentation: https://en.cppreference.com/w/cpp/thread/condition_variable
Upvotes: 2