notdodo
notdodo

Reputation: 149

Thread pool stuck on wait condition

I'm encountering a stuck in my c++ program using this thread pool class:

class ThreadPool {

    unsigned threadCount;
    std::vector<std::thread> threads;
    std::list<std::function<void(void)> > queue;

    std::atomic_int jobs_left;
    std::atomic_bool bailout;
    std::atomic_bool finished;
    std::condition_variable job_available_var;
    std::condition_variable wait_var;
    std::mutex wait_mutex;
    std::mutex queue_mutex;
    std::mutex mtx;

    void Task() {
        while (!bailout) {
            next_job()();
            --jobs_left;
            wait_var.notify_one();
       }
    }

    std::function<void(void)> next_job() {
        std::function<void(void)> res;
        std::unique_lock<std::mutex> job_lock(queue_mutex);

        // Wait for a job if we don't have any.
        job_available_var.wait(job_lock, [this]()->bool { return queue.size() || bailout; });

        // Get job from the queue
        mtx.lock();
        if (!bailout) {
            res = queue.front();
            queue.pop_front();
        }else {
            // If we're bailing out, 'inject' a job into the queue to keep jobs_left accurate.
            res = [] {};
            ++jobs_left;
        }
        mtx.unlock();
        return res;
    }

public:
    ThreadPool(int c)
        : threadCount(c)
        , threads(threadCount)
        , jobs_left(0)
        , bailout(false)
        , finished(false)
    {
        for (unsigned i = 0; i < threadCount; ++i)
            threads[i] = std::move(std::thread([this, i] { this->Task(); }));
    }

    ~ThreadPool() {
        JoinAll();
    } 

    void AddJob(std::function<void(void)> job) {
        std::lock_guard<std::mutex> lock(queue_mutex);
        queue.emplace_back(job);
        ++jobs_left;
        job_available_var.notify_one();
    }

    void JoinAll(bool WaitForAll = true) {
        if (!finished) {
            if (WaitForAll) {
                WaitAll();
            }

            // note that we're done, and wake up any thread that's
            // waiting for a new job
            bailout = true;
            job_available_var.notify_all();

            for (auto& x : threads)
                if (x.joinable())
                    x.join();
            finished = true;
        }
    }

    void WaitAll() {
        std::unique_lock<std::mutex> lk(wait_mutex);
        if (jobs_left > 0) {
            wait_var.wait(lk, [this] { return this->jobs_left == 0; });
        }
        lk.unlock();
    }
};

gdb say (when stopping the blocked execution) that the stuck was in (std::unique_lock&, ThreadPool::WaitAll()::{lambda()#1})+58>

I'm using g++ v5.3.0 with support for c++14 (-std=c++1y)

How can I avoid this problem?

Update

I've edited (rewrote) the class: https://github.com/edoz90/threadpool/blob/master/ThreadPool.h

Upvotes: 1

Views: 1839

Answers (2)

kuroi neko
kuroi neko

Reputation: 8661

The issue here is a race condition on your job count. You're using one mutex to protect the queue, and another to protect the count, which is semantically equivalent to the queue size. Clearly the second mutex is redundant (and improperly used), as is the job_count variable itself.

Every method that deals with the queue has to gain exclusive access to it (even JoinAll to read its size), so you should use the same queue_mutex in the three bits of code that tamper with it (JoinAll, AddJob and next_job).

Btw, splitting the code at next_job() is pretty awkward IMO. You would avoid calling a dummy function if you handled the worker thread body in a single function.

EDIT:

As other comments have already stated, you would probably be better off getting your eyes off the code and reconsidering the problem globally for a while.

The only thing you need to protect here is the job queue, so you need only one mutex.

Then there is the problem of waking up the various actors, which requires a condition variable since C++ basically does not give you any other useable synchronization object.

Here again you don't need more than one variable. Terminating the thread pool is equivalent to dequeueing the jobs without executing them, which can be done any which way, be it in the worker threads themselves (skipping execution if the termination flag is set) or in the JoinAll function (clearing the queue after gaining exclusive access).

Last but not least, you might want to invalidate AddJob once someone decided to close the pool, or else you could get stuck in the destructor while someone keeps feeding in new jobs.

Upvotes: 1

pcodex
pcodex

Reputation: 1940

I think you need to keep it simple. you seem to be using a mutex too many. So there's queue_mutex and you use that when you add and process jobs.

Now what's the need for another separate mutex when you are waiting on reading the queue?

Why can't you use just a conditional variable with the same queue_mutex to read the queue in your WaitAll() method?

Update

I would also recommend using a lock_guard instead of the unique_lock in your WaitAll. There really isn't a need to lock the queue_mutex beyond the WaitAll under exceptional conditions. If you exit the WaitAll exceptionally it should be released regardless.

Update2

Ignore my Update above. Since you are using a condition variable you can't use a lock guard in the WaitAll. But if you are using a unique_lock always go with the try_to_lock version especially if you have more than a couple control paths

Upvotes: 1

Related Questions