Reputation: 494
I have a situation where I want to create a single worker thread which performs task (essentially constructing a thread pool with a single thread). Multiple threads can post tasks to it, and the thread has a loop to run them.
Shouldn't be too hard as long as proper use of locks is made, I thought, and so my implementation is as follows:
typedef std::function<void()> MyTask;
class MyTaskPool {
public:
MyTaskPool() {
this->closed = false;
this->thread = std::thread(std::bind(&MyTaskPool::run, this));
}
~MyTaskPool() {
this->closed = true;
this->conditionVariable.notify_one();
this->thread.join();
}
void post(MyTask task) {
{
std::lock_guard<std::mutex>(this->mutex);
this->tasks.push(task);
}
this->conditionVariable.notify_one();
}
private:
bool closed;
std::mutex mutex;
std::condition_variable conditionVariable;
std::thread thread;
std::deque<MyTask> tasks;
void run() {
while (true) {
boost::optional<MyTask> task;
{
std::lock_guard<std::mutex>(this->mutex);
if (this->closed)
return;
if (this->tasks.size() > 0) {
task = this->tasks.front();
this->tasks.pop_front();
}
}
if (task.is_initialized()) {
task();
} else {
std::unique_lock<std::mutex> lock(this->mutex);
this->conditionVariable.wait(lock);
}
}
}
}
Built it, tested it, it works. And it's easy to use; I can create a new MyTaskPool, and post tasks to it with a simple lambda expression. Great! Except... after using this thing for an extended period of time, it suddenly breaks: this->tasks.front() fails - with an error telling me the iterator cannot be dereferenced. My tasks deque is.. empty? Both the code that adds and removes from the deque is guarded by a lock, and thus this should not happen.
Can anyone see the error - which I'm pretty sure is a race condition of some kind?
The actual code is slightly more complex, as it does some handling with each task, but that should not be relevant to this example.
Upvotes: 0
Views: 532
Reputation: 494
The error is actually suble and hardly noticeable: the lock that is used to guard the retrieval of the task is not stored in a local variable. This causes it to be destructed - and thus released - immediately.
For people that seek to implement something alike, here's how my code works now, with the comments of David and Sam integrated into it:
typedef std::function<void(MySharedResource)> MyTask;
class MyTaskPool {
public:
MyTaskPool() {
this->closed = false;
this->thread = std::thread(std::bind(&MyTaskPool::run, this));
}
~MyTaskPool() {
{
std::lock_guard<std::mutex> lock(this->mutex);
this->closed = true;
}
this->conditionVariable.notify_one();
this->thread.join();
}
void post(MyTask task) {
{
std::lock_guard<std::mutex> lock(this->mutex);
this->tasks.push(task);
}
this->conditionVariable.notify_one();
}
private:
bool closed;
std::mutex mutex;
std::condition_variable conditionVariable;
std::thread thread;
std::deque<MyTask> tasks;
void run() {
while (true) {
boost::optional<MyTask> task;
{
std::unique_lock<std::mutex> lock(this->mutex);
if (this->closed)
return;
if (this->tasks.size() > 0) {
task = this->tasks.front();
this->tasks.pop_front();
} else {
this->conditionVariable.wait(lock);
}
}
if (task.is_initialized()) {
task();
}
}
}
}
Upvotes: 1