Reputation: 149
I'm encountering a stuck in my c++ program using this thread pool class:
class ThreadPool {
unsigned threadCount;
std::vector<std::thread> threads;
std::list<std::function<void(void)> > queue;
std::atomic_int jobs_left;
std::atomic_bool bailout;
std::atomic_bool finished;
std::condition_variable job_available_var;
std::condition_variable wait_var;
std::mutex wait_mutex;
std::mutex queue_mutex;
std::mutex mtx;
void Task() {
while (!bailout) {
next_job()();
--jobs_left;
wait_var.notify_one();
}
}
std::function<void(void)> next_job() {
std::function<void(void)> res;
std::unique_lock<std::mutex> job_lock(queue_mutex);
// Wait for a job if we don't have any.
job_available_var.wait(job_lock, [this]()->bool { return queue.size() || bailout; });
// Get job from the queue
mtx.lock();
if (!bailout) {
res = queue.front();
queue.pop_front();
}else {
// If we're bailing out, 'inject' a job into the queue to keep jobs_left accurate.
res = [] {};
++jobs_left;
}
mtx.unlock();
return res;
}
public:
ThreadPool(int c)
: threadCount(c)
, threads(threadCount)
, jobs_left(0)
, bailout(false)
, finished(false)
{
for (unsigned i = 0; i < threadCount; ++i)
threads[i] = std::move(std::thread([this, i] { this->Task(); }));
}
~ThreadPool() {
JoinAll();
}
void AddJob(std::function<void(void)> job) {
std::lock_guard<std::mutex> lock(queue_mutex);
queue.emplace_back(job);
++jobs_left;
job_available_var.notify_one();
}
void JoinAll(bool WaitForAll = true) {
if (!finished) {
if (WaitForAll) {
WaitAll();
}
// note that we're done, and wake up any thread that's
// waiting for a new job
bailout = true;
job_available_var.notify_all();
for (auto& x : threads)
if (x.joinable())
x.join();
finished = true;
}
}
void WaitAll() {
std::unique_lock<std::mutex> lk(wait_mutex);
if (jobs_left > 0) {
wait_var.wait(lk, [this] { return this->jobs_left == 0; });
}
lk.unlock();
}
};
gdb say (when stopping the blocked execution) that the stuck was in (std::unique_lock&, ThreadPool::WaitAll()::{lambda()#1})+58>
I'm using g++ v5.3.0 with support for c++14 (-std=c++1y)
How can I avoid this problem?
I've edited (rewrote) the class: https://github.com/edoz90/threadpool/blob/master/ThreadPool.h
Upvotes: 1
Views: 1839
Reputation: 8661
The issue here is a race condition on your job count. You're using one mutex to protect the queue, and another to protect the count, which is semantically equivalent to the queue size. Clearly the second mutex is redundant (and improperly used), as is the job_count
variable itself.
Every method that deals with the queue has to gain exclusive access to it (even JoinAll
to read its size), so you should use the same queue_mutex
in the three bits of code that tamper with it (JoinAll
, AddJob
and next_job
).
Btw, splitting the code at next_job()
is pretty awkward IMO. You would avoid calling a dummy function if you handled the worker thread body in a single function.
EDIT:
As other comments have already stated, you would probably be better off getting your eyes off the code and reconsidering the problem globally for a while.
The only thing you need to protect here is the job queue, so you need only one mutex.
Then there is the problem of waking up the various actors, which requires a condition variable since C++ basically does not give you any other useable synchronization object.
Here again you don't need more than one variable. Terminating the thread pool is equivalent to dequeueing the jobs without executing them, which can be done any which way, be it in the worker threads themselves (skipping execution if the termination flag is set) or in the JoinAll
function (clearing the queue after gaining exclusive access).
Last but not least, you might want to invalidate AddJob
once someone decided to close the pool, or else you could get stuck in the destructor while someone keeps feeding in new jobs.
Upvotes: 1
Reputation: 1940
I think you need to keep it simple.
you seem to be using a mutex too many. So there's queue_mutex
and you use that when you add and process jobs.
Now what's the need for another separate mutex when you are waiting on reading the queue?
Why can't you use just a conditional variable with the same queue_mutex
to read the queue in your WaitAll()
method?
I would also recommend using a lock_guard
instead of the unique_lock
in your WaitAll
. There really isn't a need to lock the queue_mutex
beyond the WaitAll
under exceptional conditions. If you exit the WaitAll
exceptionally it should be released regardless.
Ignore my Update above. Since you are using a condition variable you can't use a lock guard in the WaitAll
. But if you are using a unique_lock
always go with the try_to_lock
version especially if you have more than a couple control paths
Upvotes: 1