Reputation: 1107
After reading some other articles, I got to know that I could implement a c++ blocking queue like this:
template<typename T>
class BlockingQueue {
public:
std::mutex mtx;
std::condition_variable not_full;
std::condition_variable not_empty;
std::queue<T> queue;
size_t capacity{5};
BlockingQueue()=default;
BlockingQueue(int cap):capacity(cap) {}
BlockingQueue(const BlockingQueue&)=delete;
BlockingQueue& operator=(const BlockingQueue&)=delete;
void push(const T& data) {
std::unique_lock<std::mutex> lock(mtx);
while (queue.size() >= capacity) {
not_full.wait(lock, [&]{return queue.size() < capacity;});
}
queue.push(data);
not_empty.notify_all();
}
T pop() {
std::unique_lock<std::mutex> lock(mtx);
while (queue.empty()) {
not_empty.wait(lock, [&]{return !queue.empty();});
}
T res = queue.front();
queue.pop();
not_full.notify_all();
return res;
}
bool empty() {
std::unique_lock<std::mutex> lock(mtx);
return queue.empty();
}
size_t size() {
std::unique_lock<std::mutex> lock(mtx);
return queue.size();
}
void set_capacity(const size_t capacity) {
this->capacity = (capacity > 0 ? capacity : 10);
}
};
This works for me, but I do not know how could I shut it down if I start it in the background thread:
void main() {
BlockingQueue<float> q;
bool stop{false};
auto fun = [&] {
std::cout << "before entering loop\n";
while (!stop) {
q.push(1);
}
std::cout << "after entering loop\n";
};
std::thread t_bg(fun);
t_bg.detach();
// Some other tasks here
stop = true;
// How could I shut it down before quit here, or could I simply let the operation system do that when the whole program is over?
}
The problem is that when I want to shut down the background thread, the background thread might have been sleeping because the queue is full and the push operation is blocked. How could I stop it when I want the background thread to stop ?
Upvotes: 0
Views: 723
Reputation: 9113
One easy way would be to add a flag that you set from outside when you want to abort a pop()
operation that's already blocked. And then you'd have to decide what an aborted pop()
is going to return. One way is for it to throw an exception, another would be to return an std::optional<T>
. Here's the first method (I'll only write the changed parts.)
Add this type wherever you think is appropriate:
struct AbortedPopException {};
Add this to your class fields:
mutable std::atomic<bool> abort_flag = false;
Also add this method:
void abort () const {
abort_flag = true;
}
Change the while
loop in the pop()
method like this: (you don't need the while
at all, since I believe the condition variable wait()
method that accepts a lambda does not wake up/return spuriously; i.e. the loop is inside the wait already.)
not_empty.wait(lock, [this]{return !queue.empty() || abort_flag;});
if (abort_flag)
throw AbortedPopException{};
That's it (I believe.)
In your main()
, when you want to shut the "consumer" down you can call abort()
on your queue. But you'll have to handle the thrown exception there as well. It's your "exit" signal, basically.
Some side notes:
Don't detach from threads! Specially here where AFAICT there is no reason for it (and some actual danger too.) Just signal them to exit (in any manner appropriate) and join()
them.
Your stop
flag should be atomic. You read from it in your background thread and write to it from your main thread, and those can (and in fact do) overlap in time, so... data race!
I don't understand why you have a "full" state and "capacity" in your queue. Think about whether they are necessary.
UPDATE 1: In response to OP's comment about detaching... Here's what happens in your main thread:
main()
, you signal the thread to stop (e.g. by setting stop
flag to true
)join()
with the thread.It is true that your main thread will block while it is waiting for the thread to pick up the "stop" signal, exit its loop, and return from its thread function, but that's a very very short wait. And you have nothing else to do. More importantly, you'll know that your thread exited cleanly and predictably, and from that point on, you know definitely that that thread won't be running (not important for you here, but could be critical for some other threaded task.)
That is the pattern that you usually want to follow in spawning worker thread that loop over a short task.
Update 2: About "full" and "capacity" of the queue. That's fine. It's certainly your decision. No problem with that.
Update 3: About "throwing" vs. returning an "empty" object to signal an aborted "blocking pop()
". I don't think there is anything wrong with throwing like that; specially since it is very very rare (just happens once at the end of the operation of the producer/consumer.) However, if all T
types that you want to store in your Queue
have an "invalid" or "empty" state, then you certainly can use that. But throwing is more general, if more "icky" to some people.
Upvotes: 1