Reputation: 1592
First a little context: I'm in the process of learning about threading in C++11 and for this purpose, I'm trying to build a small actor
class, essentially (I left the exception handling and propagation stuff out) like so:
class actor {
private: std::atomic<bool> stop;
private: std::condition_variable interrupt;
private: std::thread actor_thread;
private: message_queue incoming_msgs;
public: actor()
: stop(false),
actor_thread([&]{ run_actor(); })
public: virtual ~actor() {
// if the actor is destroyed, we must ensure the thread dies too
stop = true;
// to this end, we have to interrupt the actor thread which is most probably
// waiting on the incoming_msgs queue:
private: virtual void run_actor() {
try {
// wait for new message and process it
// but interrupt the waiting process if interrupt is signaled:
catch(interrupted_exception) {
// ...
private: virtual void process(const message&) = 0;
// ...
Every actor runs in its own actor_thread
, waits on a new incoming message on incoming_msgs
and -- when a message arrives -- processes it.
The actor_thread
is created together with the actor
and has to die together with it, which is why I need some kind of interrupt mechanism in the message_queue::wait_and_pop(std::condition_variable interrupt)
Essentially, I require that wait_and_pop
blocks until either
a) a new message
arrives or
b) until the interrupt
is fired, in which case -- ideally -- an interrupted_exception
is to be thrown.
The arrival of a new message in the message_queue
is presently modeled also by a std::condition_variable new_msg_notification
// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
std::unique_lock<std::mutex> lock(mutex);
// How to interrupt the following, when interrupt fires??
return !queue.empty();
auto msg(std::move(queue.front()));
return msg;
To cut the long story short, the question is this: How do I interrupt the waiting for a new message in new_msg_notification.wait(...)
when the interrupt
is triggered (without introducing a time-out)?
Alternatively, the question may be read as: How do I wait until any one of two std::condition_variable
s are signaled?
One naive approach seems to be not to use std::condition_variable
at all for the interrupt and instead just use an atomic flag std::atomic<bool> interrupted
and then busy wait on new_msg_notification
with a very small time-out until either a new message has arrived or until true==interrupted
. However, I would very much like to avoid busy waiting.
From the comments and the answer by pilcrow, it looks like there are basically two approaches possible.
For those of you interested, my implementation goes as follows. The condition variable in my case is actually a semaphore
(because I like them more and because I liked the exercise of doing so). I equipped this semaphore with an associated interrupt
which can be obtained from the semaphore via semaphore::get_interrupt()
. If now one thread blocks in semaphore::wait()
, another thread has the possibility to call semaphore::interrupt::trigger()
on the interrupt of the semaphore, causing the first thread to unblock and propagate an interrupt_exception
interrupt_exception {};
semaphore {
public: class interrupt;
private: mutable std::mutex mutex;
// must be declared after our mutex due to construction order!
private: interrupt* informed_by;
private: std::atomic<long> counter;
private: std::condition_variable cond;
~semaphore() throw();
public: void
public: interrupt&
get_interrupt() const { return *informed_by; }
public: void
post() {
std::lock_guard<std::mutex> lock(mutex);
cond.notify_one(); // never throws
public: unsigned long
load () const {
return counter.load();
semaphore::interrupt {
private: semaphore *forward_posts_to;
private: std::atomic<bool> triggered;
interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
forward_posts_to->informed_by = this;
public: void
trigger() {
triggered = true;
forward_posts_to->cond.notify_one(); // never throws
public: bool
is_triggered () const throw() {
return triggered.load();
public: void
reset () throw() {
semaphore::semaphore() : counter(0L), informed_by(new interrupt(this)) {}
// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw() {
delete informed_by;
semaphore::wait() {
std::unique_lock<std::mutex> lock(mutex);
if(0L==counter) {
throw interrupt_exception();
return counter>0;
Using this semaphore
, my message queue implementation now looks like this (using the semaphore instead of the std::condition_variable
I could get rid of the std::mutex
message_queue {
private: std::queue<message> queue;
private: semaphore new_msg_notification;
public: void
push(message&& msg) {
public: const message
wait_and_pop() {
auto msg(std::move(queue.front()));
return msg;
public: semaphore::interrupt&
get_interrupt() const { return new_msg_notification.get_interrupt(); }
My actor
, is now able to interrupt its thread with very low latency in its thread. The implementation presently like this:
actor {
private: message_queue
/// must be declared after incoming_msgs due to construction order!
private: semaphore::interrupt&
private: std::thread
private: std::exception_ptr
: interrupt(incoming_msgs.get_interrupt()), my_thread(
try {
catch(...) {
exception = std::current_exception();
private: virtual void
run_actor() {
private: virtual void
process(const message&) = 0;
public: void
notify(message&& msg_in) {
public: virtual
~actor() throw (interrupt_exception) {
Upvotes: 51
Views: 37506
Reputation: 1
Maybe this can works:
get rid of interrupt.
message wait_and_pop(std::condition_variable& interrupt) {
std::unique_lock<std::mutex> lock(mutex);
return !queue.empty() || stop;
if( !stop )
auto msg(std::move(queue.front()));
return msg;
return NULL; //or some 'terminate' message
In destructor, replace interrupt.notify_all()
with new_msg_notification.notify_all()
Upvotes: -1
Reputation: 154
Recently I resolved this issue with the help of single condition variable and separate Boolean variable for each producer/worker. The predicate within the wait function in consumer thread can check for these flags and take the decision which producer/worker has satisfied the condition.
Upvotes: 3
Reputation: 58741
You ask,
What is the best way to wait on multiple condition variables in C++11?
You can't, and must redesign. One thread may wait on only one condition variable (and its associated mutex) at a time. In this regard the Windows facilities for synchronization are rather richer than those of the "POSIX-style" family of synchronization primitives.
The typical approach with thread-safe queues is to enqueue a special "all done!" message, or to design a "breakable" (or "shutdown-able") queue. In the latter case, the queue's internal condition variable then protects a complex predicate: either an item is available or the queue has been broken.
In a comment you observe that
a notify_all() will have no effect if there is no one waiting
That's true but probably not relevant. wait()
ing on a condition variable also implies checking a predicate, and checking it before actually blocking for a notification. So, a worker thread busy processing a queue item that "misses" a notify_all()
will see, the next time it inspects the queue condition, that the predicate (a new item is available, or, the queue is all done) has changed.
Upvotes: 25