Stan Hebben
Stan Hebben

Reputation: 494

Race condition in c++ task thread with deque

I have a situation where I want to create a single worker thread which performs task (essentially constructing a thread pool with a single thread). Multiple threads can post tasks to it, and the thread has a loop to run them.

Shouldn't be too hard as long as proper use of locks is made, I thought, and so my implementation is as follows:

typedef std::function<void()> MyTask;

class MyTaskPool {
public:
    MyTaskPool() {
        this->closed = false;
        this->thread = std::thread(std::bind(&MyTaskPool::run, this));
    }

    ~MyTaskPool() {
        this->closed = true;
        this->conditionVariable.notify_one();
        this->thread.join();
    }

    void post(MyTask task) {
        {
            std::lock_guard<std::mutex>(this->mutex);
            this->tasks.push(task);
        }
        this->conditionVariable.notify_one();
    }
private:
    bool closed;
    std::mutex mutex;
    std::condition_variable conditionVariable;
    std::thread thread;
    std::deque<MyTask> tasks;

    void run() {
        while (true) {
            boost::optional<MyTask> task;
            {
                std::lock_guard<std::mutex>(this->mutex);
                if (this->closed)
                    return;

                if (this->tasks.size() > 0) {
                    task = this->tasks.front();
                    this->tasks.pop_front();
                }
            }

            if (task.is_initialized()) {
                task();
            } else {
                std::unique_lock<std::mutex> lock(this->mutex);
                this->conditionVariable.wait(lock);
            }
        }
    }
}

Built it, tested it, it works. And it's easy to use; I can create a new MyTaskPool, and post tasks to it with a simple lambda expression. Great! Except... after using this thing for an extended period of time, it suddenly breaks: this->tasks.front() fails - with an error telling me the iterator cannot be dereferenced. My tasks deque is.. empty? Both the code that adds and removes from the deque is guarded by a lock, and thus this should not happen.

Can anyone see the error - which I'm pretty sure is a race condition of some kind?

The actual code is slightly more complex, as it does some handling with each task, but that should not be relevant to this example.

Upvotes: 0

Views: 532

Answers (1)

Stan Hebben
Stan Hebben

Reputation: 494

The error is actually suble and hardly noticeable: the lock that is used to guard the retrieval of the task is not stored in a local variable. This causes it to be destructed - and thus released - immediately.

For people that seek to implement something alike, here's how my code works now, with the comments of David and Sam integrated into it:

typedef std::function<void(MySharedResource)> MyTask;

class MyTaskPool {
public:
    MyTaskPool() {
        this->closed = false;
        this->thread = std::thread(std::bind(&MyTaskPool::run, this));
    }

    ~MyTaskPool() {
        {
            std::lock_guard<std::mutex> lock(this->mutex);
            this->closed = true;
        }
        this->conditionVariable.notify_one();
        this->thread.join();
    }

    void post(MyTask task) {
        {
            std::lock_guard<std::mutex> lock(this->mutex);
            this->tasks.push(task);
        }
        this->conditionVariable.notify_one();
    }
private:
    bool closed;
    std::mutex mutex;
    std::condition_variable conditionVariable;
    std::thread thread;
    std::deque<MyTask> tasks;

    void run() {
        while (true) {
            boost::optional<MyTask> task;
            {
                std::unique_lock<std::mutex> lock(this->mutex);
                if (this->closed)
                    return;

                if (this->tasks.size() > 0) {
                    task = this->tasks.front();
                    this->tasks.pop_front();
                } else {
                    this->conditionVariable.wait(lock);
                }
            }

            if (task.is_initialized()) {
                task();
            }
        }
    }
}

Upvotes: 1

Related Questions