Reputation: 2087
I have been trying to figure out std::condition_variables and I am particularly confused by wait()
and whether to use notify_all
or notify_one
.
First, I've written some code and attached it below. Here's a short explanation: Collection
is a class that holds onto a bunch of Counter
objects. These Counter
objects have a Counter::increment()
method, which needs to be called on all the objects, over and over again. To speed everything up, Collection
also maintains a thread pool to distribute the work over, and sends out all the work with its Collection::increment_all()
method.
These threads don't need to communicate with each other, and there are usually many more Counter
objects than there are threads. It's fine if one thread processes more than Counter
s than others, just as long as all the work gets done. Adding work to the queue is easy and only needs to be done in the "main" thread. As far as I can see, the only bad thing that can happen is if other methods (e.g. Collection::printCounts
) are allowed to be called on the counters in the middle of the work being done.
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
class Counter{
private:
int m_count;
public:
Counter() : m_count(0) {}
void increment() {
m_count ++;
}
int getCount() const { return m_count; }
};
class Collection{
public:
Collection(unsigned num_threads, unsigned num_counters)
: m_shutdown(false)
{
// start workers
for(size_t i = 0; i < num_threads; ++i){
m_threads.push_back(std::thread(&Collection::work, this));
}
// intsntiate counters
for(size_t j = 0; j < num_counters; ++j){
m_counters.emplace_back();
}
}
~Collection()
{
m_shutdown = true;
for(auto& t : m_threads){
if(t.joinable()){
t.join();
}
}
}
void printCounts() {
// wait for work to be done
std::unique_lock<std::mutex> lk(m_mtx);
m_work_complete.wait(lk); // q2: do I need a while lop?
// print all current counters
for(const auto& cntr : m_counters){
std::cout << cntr.getCount() << ", ";
}
std::cout << "\n";
}
void increment_all()
{
std::unique_lock<std::mutex> lock(m_mtx);
m_work_complete.wait(lock);
for(size_t i = 0; i < m_counters.size(); ++i){
m_which_counters_have_work.push(i);
}
}
private:
void work()
{
while(!m_shutdown){
bool action = false;
unsigned which_counter;
{
std::unique_lock<std::mutex> lock(m_mtx);
if(m_which_counters_have_work.size()){
which_counter = m_which_counters_have_work.front();
m_which_counters_have_work.pop();
action = true;
}else{
m_work_complete.notify_one(); // q1: notify_all
}
}
if(action){
m_counters[which_counter].increment();
}
}
}
std::vector<Counter> m_counters;
std::vector<std::thread> m_threads;
std::condition_variable m_work_complete;
std::mutex m_mtx;
std::queue<unsigned> m_which_counters_have_work;
bool m_shutdown;
};
int main() {
int num_threads = std::thread::hardware_concurrency()-1;
int num_counters = 10;
Collection myCollection(num_threads, num_counters);
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
return 0;
}
I compile this on Ubuntu 18.04 with g++ -std=c++17 -pthread thread_pool.cpp -o tp && ./tp
I think the code accomplishes all of those objectives, but a few questions remain:
I am using m_work_complete.wait(lk)
to make sure the work is finished before I start printing all the new counts. Why do I sometimes see this written inside a while
loop, or with a second argument as a lambda predicate function? These docs mention spurious wake ups. If a spurious wake up occurs, does that mean printCounts
could prematurely print? If so, I don't want that. I just want to ensure the work queue is empty before I start using the numbers that should be there.
I am using m_work_complete.notify_all
instead of m_work_complete.notify_one
. I've read this thread, and I don't think it matters--only the main thread is going to be blocked by this. Is it faster to use notify_one
just so the other threads don't have to worry about it?
Upvotes: 0
Views: 176
Reputation: 85286
std::condition_variable
is not really a condition variable, it's more of a synchronization tool for reaching a certain condition. What that condition is is up to the programmer, and it should still be checked after each condition_variable
wake-up, since it can wake-up spuriously, or "too early", when the desired condition isn't yet reached.
On POSIX systems, condition_variable::wait()
delegates to pthread_cond_wait
, which is susceptible to spurious wake-up (see "Condition Wait Semantics" in the Rationale section). On Linux, pthread_cond_wait
is in turn implemented via a futex
, which is again susceptible to spurious wake-up.
So yes you still need a flag (protected by the same mutex) or some other way to check that the work is actually complete. A convenient way to do this is by wrapping the check in a predicate and passing it to the wait()
function, which would loop for you until the predicate is satisfied.
notify_all
unblocks all threads waiting on the condition variable; notify_one
unblocks just one (or at least one, to be precise). If there are more than one waiting threads, and they are equivalent, i.e. either one can handle the condition fully, and if the condition is sufficient to let just one thread continue (as in submitting a work unit to a thread pool), then notify_one
would be more efficient since it won't unblock other threads unnecessarily for them to only notice no work to be done and going back to waiting. If you ever only have one waiter, then there would be no difference between notify_one
and notify_all
.
Upvotes: 1
Reputation: 27115
It's pretty simple: Use notify()
when;
notify()
to announce the availability of an item that a worker thread will "consume," and thereby make the item unavailable to other workers)wait()
ing in the same line of the same exact function.)Use notify_all()
in all other cases.
Upvotes: 0