Reputation: 25
I'm trying to write a dirt simple threadpool to learn how they work under the hood. I've run into a problem, though. When I use my condition_variable and call notify_all(), it only wakes up one thread in my pool.
Everything else works fine. I've queued up 900 jobs, each with a decent payload. The one thread that wakes up consumes all those jobs, then goes back to sleep. On the next loop this all happens again.
The problem is that only one thread does the work! How have I messed up templating this?
ThreadPool.h:
#pragma once
#include <mutex>
#include <stack>
#include <atomic>
#include <thread>
#include <condition_variable>
class ThreadPool
{
friend void __stdcall ThreadFunc();
public:
static ThreadPool & GetInstance()
{
static ThreadPool sInstance;
return (sInstance);
}
public:
void AddJob(Job * job);
void DoAllJobs();
private:
Job * GetJob();
private:
const static uint32_t ThreadCount = 8;
std::mutex JobMutex;
std::stack<Job *> Jobs;
volatile std::atomic<int> JobWorkCounter;
std::mutex SharedLock;
std::thread Threads[ThreadCount];
std::condition_variable Signal;
private:
ThreadPool();
~ThreadPool();
public:
ThreadPool(ThreadPool const &) = delete;
void operator = (ThreadPool const &) = delete;
};
ThreadPool.cpp:
#include "ThreadPool.h"
void __stdcall ThreadFunc()
{
std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);
while (true)
{
ThreadPool::GetInstance().Signal.wait(lock);
while (Job * job = ThreadPool::GetInstance().GetJob())
{
job->_jobFn(job->_args);
ThreadPool::GetInstance().JobWorkCounter--;
}
}
}
ThreadPool::ThreadPool()
{
JobWorkCounter = 0;
for (uint32_t i = 0; i < ThreadCount; ++i)
Threads[i] = std::thread(ThreadFunc);
}
ThreadPool::~ThreadPool()
{
}
void ThreadPool::AddJob(Job * job)
{
JobWorkCounter++;
JobMutex.lock();
{
Jobs.push(job);
}
JobMutex.unlock();
}
void ThreadPool::DoAllJobs()
{
Signal.notify_all();
while (JobWorkCounter > 0)
{
Sleep(0);
}
}
Job * ThreadPool::GetJob()
{
Job * return_value = nullptr;
JobMutex.lock();
{
if (Jobs.empty() == false)
{
return_value = Jobs.top();
Jobs.pop();
}
}
JobMutex.unlock();
return (return_value);
}
Thanks for any help! Sorry for the big code post.
Upvotes: 1
Views: 2129
Reputation: 275800
Unless you want to design a new pattern, the easy "monkey-see monkey-do" way to work with condition variables is with 3 things always.
A condition variable, a mutex, and a message.
std::condition_variable cv;
mutable std::mutex m;
your_message_type message;
Then there are 3 patterns to follow. Send one message:
std::unique_lock l{m}; // C++17, don't need to pass type
set_message_data(message);
cv.notify_one();
Send lots of messages:
std::unique_lock l{m};
set_lots_of_message_data(message);
cv.notify_all();
and finally, wait for and process messages:
while(true) {
auto data = [&]()->std::optional<data_to_process>{
std::unique_lock l{m};
cv.wait( l, [&]{ return done() || there_is_a_message(message); } );
if (done()) return {};
return get_data_to_process(message);
}();
if (!data) break;
auto& data_to_process = *data;
// process the data
}
There is some flexibility. But there are a number of rules to follow.
Between setting the message data, and notifying, you must have the mutex locked.
You should always use the lambda version of wait
-- doing it without the lambda version means you are doing in wrong 99 times out of 100.
The message data should be enough to determine if a task should be done, if it wasn't for pesky threads and locks and stuff.
Only use RAII means to lock/unlock mutexes. Correctness without that is nearly impossible.
Don't hold the lock while processing stuff. Hold the lock long enough to get the data to process, then drop the lock.
Your code violates 2, 3, 4, 5. I think you don't screw up 1.
However, modern cv implementations are actually highly efficient if you hold the lock on the cv when you notify.
I think the most obvious symptoms are from 3: your worker threads are always holding a lock, so only one can progress. The others cause other issues in your code.
Now, going beyond this relatively simple pattern is possible. But once you do you really need to have at least a basic understanding of the C++ threading model, and you cannot learn by writing code and "see if it works". You have to sit down with the specs, read them over, understand what the condition variable does in the standard, understand what mutexes do, write some code, sit down and work out why it doesn't work, find someone else who wrote similar code and it had problems, work out how that other person debugged it and found the mistake, go back to your code and find the same mistake, adjust it, and repeat.
This is the reason why I write primitives using condition variables, I don't mix condition variables in with other logic (like, say, maintaining a thread pool).
Write a thread-safe queue. All it does is maintain a queue and notify consumers when there is data to read.
The simplest one has 3 member variables -- a mutex, a condition variable, and a std queue.
Then augment it with shutdown capabilities -- now pop has to return an optional or have some other failure path.
Your task requires batching up of tasks before firing them all off. Are you sure you want that? For that, what I'd do is add in a "push multiple tasks" interface to the thread safe queue. Then maintain the "not ready" tasks in a non-thread-safe-queue, and only push them all once we want threads to consume them.
The "thread pool" then consumes the thread safe queue. Because we wrote the thread safe queue separately, we have half as many moving parts, which means 4 times fewer relations.
Threading code is hard. Respect it.
Upvotes: 5
Reputation: 118445
std::unique_lock<std::mutex> lock(ThreadPool::GetInstance().SharedLock);
Each thread acquires this mutex, first.
ThreadPool::GetInstance().Signal.wait(lock);
All threads will receive the signal from the condition variable when the main thread does notify_all()
, but what you are forgetting one crucial detail: after waking up after being notified by a condition variable, the mutex gets automatically re-locked. That's how wait()
works. Read its documentation in your C++ book, or the manual pages; and only one thread will be able to do that. All the other threads that wake up will also try to lock the mutex, but only the first one wins the race and will do that, and all the other threads will sleep and continue dreaming.
A thread after being notified will not return from wait()
until that thread successfully relocks the mutex, too.
To return from wait()
two things must happen: the thread gets notified from the condition variable, and the thread relocks the mutex, successfully. wait()
unlocks the mutex and waits on the condition variable, atomically, and relocks the mutex when it is notified.
So, the lucky thread will lock the mutex, and proceed to drain the queue of all the jobs, then go back to the top of the loop and wait()
again. This unlocks the mutex, and now some other lucky thread, that's been notified but waiting patiently for its chance to bask in sunlight and glory, will be able to lock the mutex. In this manner, all the other threads will take turns, elephant-style, waking up, checking the job queue, find nothing there, and go to sleep.
This is the reason why you're seeing this behavior.
There are two basic things that must be done to make the shown code thread safe.
1) You do not need two mutexes, one will be perfectly sufficient.
2) Before wait()
ing on the condition variable, check if there's something in the job queue. If there is something, remove it, and unlock the mutex, then do the job.
3) wait()
only if the job queue is empty. After wait()
returns, relock the mutex, and then check if the job queue is still empty (you're not really guaranteed at this point that it's not empty, only that it may be non-empty).
You only need one mutex to protect access to the non-thread safe job queue, and to wait on the condition variable.
Upvotes: 2