stfx
stfx

Reputation: 149

Correctly using atomic<T>

I am currently struggling with writing lockless code for atomic variables for a C++11 thread pool class based on https://github.com/nbsdx/ThreadPool. Here is my modified class:

class ThreadPool
{
public:
    explicit ThreadPool(int threadCount) :
        _jobsLeft(0),
        _bailout(false)
    {
        _threads = std::vector<std::thread>(threadCount);
        for (int index = 0; index < threadCount; ++index)
        {
            _threads[index] = std::move(std::thread([this]
                {
                    this->Task();
                }));
        }
    }

    void AddJob(std::function<void(void)> job)
    {
        {
            std::lock_guard<std::mutex> lock(_queueMutex);
            _queue.emplace(job);
        }
        {
            std::lock_guard<std::mutex> lock(_jobsLeftMutex);
            ++_jobsLeft;
        }
        _jobAvailableVar.notify_one();
    }

    void JoinAll()
    {
        if (!_bailout)
        {
            _bailout = true;
            _jobAvailableVar.notify_all();
            for (auto& x : _threads)
            {
                if (x.joinable())
                {
                    x.join();
                }
            }
        }
    }

    void WaitAll()
    {
        std::unique_lock<std::mutex> lock(_jobsLeftMutex);
        if (_jobsLeft > 0)
        {
            _waitVar.wait(lock, [this]
                          {
                              return _jobsLeft == 0;
                          });
        }
    }

private:
    void Task()
    {
        while (!_bailout)
        {
            std::function<void(void)> job;
            {
                std::unique_lock<std::mutex> lock(_queueMutex);
                _jobAvailableVar.wait(lock, [this]
                                      {
                                          return _queue.size() > 0 || _bailout;
                                      });
                if (_bailout)
                {
                    return;
                }
                job = _queue.front();
                _queue.pop();
            }
            job();
            {
                std::lock_guard<std::mutex> lock(_jobsLeftMutex);
                --_jobsLeft;
            }
            _waitVar.notify_one();
        }
    }

    std::vector<std::thread> _threads;
    std::queue<std::function<void(void)>> _queue;
    int _jobsLeft;
    std::atomic<bool> _bailout;
    std::condition_variable _jobAvailableVar;
    std::condition_variable _waitVar;
    std::mutex _jobsLeftMutex;
    std::mutex _queueMutex;
};

I am puzzled about the following and would really appreciate any pointers:

EDIT: Final version with normal locks is available here: https://github.com/stfx/ThreadPool2

Upvotes: 2

Views: 196

Answers (1)

Slupka
Slupka

Reputation: 121

Look at the atomic methods: http://en.cppreference.com/w/cpp/atomic/atomic

You should use the load method to atomically read the value. However, be aware that the following is not atomic.

if (!_bailout)
{
    _bailout = true;

There is a method to perform these comparison and value swaps. See compare_exchange_weak method. For the jobCount you can use atomic. The ++ and -- are atomic.

However, be aware that the atomic is only a single method call, after the call the situation may change. You still need a synchronized queue and for that you need a lock. It does not need to be a standard OS lock, you can create a lock using atomic variable (using the store and compare_exchange_weak methods). See the following post: https://webkit.org/blog/6161/locking-in-webkit/

Upvotes: 1

Related Questions