Reputation: 149
I am currently struggling with writing lockless code for atomic variables for a C++11 thread pool class based on https://github.com/nbsdx/ThreadPool. Here is my modified class:
class ThreadPool
{
public:
explicit ThreadPool(int threadCount) :
_jobsLeft(0),
_bailout(false)
{
_threads = std::vector<std::thread>(threadCount);
for (int index = 0; index < threadCount; ++index)
{
_threads[index] = std::move(std::thread([this]
{
this->Task();
}));
}
}
void AddJob(std::function<void(void)> job)
{
{
std::lock_guard<std::mutex> lock(_queueMutex);
_queue.emplace(job);
}
{
std::lock_guard<std::mutex> lock(_jobsLeftMutex);
++_jobsLeft;
}
_jobAvailableVar.notify_one();
}
void JoinAll()
{
if (!_bailout)
{
_bailout = true;
_jobAvailableVar.notify_all();
for (auto& x : _threads)
{
if (x.joinable())
{
x.join();
}
}
}
}
void WaitAll()
{
std::unique_lock<std::mutex> lock(_jobsLeftMutex);
if (_jobsLeft > 0)
{
_waitVar.wait(lock, [this]
{
return _jobsLeft == 0;
});
}
}
private:
void Task()
{
while (!_bailout)
{
std::function<void(void)> job;
{
std::unique_lock<std::mutex> lock(_queueMutex);
_jobAvailableVar.wait(lock, [this]
{
return _queue.size() > 0 || _bailout;
});
if (_bailout)
{
return;
}
job = _queue.front();
_queue.pop();
}
job();
{
std::lock_guard<std::mutex> lock(_jobsLeftMutex);
--_jobsLeft;
}
_waitVar.notify_one();
}
}
std::vector<std::thread> _threads;
std::queue<std::function<void(void)>> _queue;
int _jobsLeft;
std::atomic<bool> _bailout;
std::condition_variable _jobAvailableVar;
std::condition_variable _waitVar;
std::mutex _jobsLeftMutex;
std::mutex _queueMutex;
};
I am puzzled about the following and would really appreciate any pointers:
_bailout
really atomic? I played around with atomic_flag
but could not get it to work in this scenario._jobsLeft
behave correctly using an atomic<int>
without all those locks? I had an issue with the original thread pool where WaitAll()
would not correctly return in rare cases. It works perfectly fine now but I feel like it wastes performance.EDIT: Final version with normal locks is available here: https://github.com/stfx/ThreadPool2
Upvotes: 2
Views: 196
Reputation: 121
Look at the atomic methods: http://en.cppreference.com/w/cpp/atomic/atomic
You should use the load method to atomically read the value. However, be aware that the following is not atomic.
if (!_bailout)
{
_bailout = true;
There is a method to perform these comparison and value swaps. See compare_exchange_weak method. For the jobCount you can use atomic. The ++ and -- are atomic.
However, be aware that the atomic is only a single method call, after the call the situation may change. You still need a synchronized queue and for that you need a lock. It does not need to be a standard OS lock, you can create a lock using atomic variable (using the store and compare_exchange_weak methods). See the following post: https://webkit.org/blog/6161/locking-in-webkit/
Upvotes: 1