fabian
fabian

Reputation: 1881

Can I use std::barrier to replace individual atomic flags in a threadpool?

Can I use a std::barrier in a simple threadpool to replace individual atomic flags?

Consider the simple threadpool below: Threads are signaled to start working through an individual std::atomic_flag by the main thread. The loop for the individual thread is:

void ThreadPool::IndividualThread::main() {
    while (true) {
       flag.wait(false);
       int i{0};
       while ((i = pool->idx.fetch_sub(1)) > -1) {
            (*(pool->task))(pool->ctx, i);
       }
       flag.clear(std::memory_order::relaxed);
       if ((pool->completed_tasks.fetch_add(1) + 1) == pool->threads.size()) {
           pool->finsish_flag.test_and_set(std::memory_order::release);
           pool->finsish_flag.notify_one();
       }
    }
}

The main thread coordinates the work through the QueueTask function as follows:

void ThreadPool::QueueTask(ThreadPool::function* function, void* context,
                           int r) {
    task = function;
    ctx = context;
    completed_tasks.store(0);
    idx.store(r);
    for (ThreadStorage& thread : storage) {
        thread.flag.test_and_set();
        thread.flag.notify_one();
    }
    finsish_flag.wait(false);
    completed_tasks.store(0);
    finsish_flag.clear(std::memory_order::relaxed);
}

(I have omitted the termination condition for the individual threads and checks whether the number of threads is positive). The class declaration is:

class ThreadPool {
    struct IndividualThread {
        ThreadPool* pool{nullptr};
        std::atomic_flag flag{false};
        void main();
    };

   public:
    using function = void(void* context, size_t idx);
    explicit ThreadPool(size_t num_threads);
    ~ThreadPool();
    void QueueTask(function*, void* context, int range);

   private:
    std::vector<ThreadStorage> storage;
    std::vector<std::jthread> threads;
    std::atomic<size_t> completed_tasks{0};  // wait on new jobs or termination
    std::atomic<int> idx{0};
    function* task{nullptr};
    void* ctx{nullptr};
};

For some time, I've been wondering whether I can use ``std::barrier` as a coordination mechanism for the threadpool. Inspired by Anthony Williams's talks about the c++20 concurrency features (for example here slide 63), I am thinking of rewriting the two functions as follows:

std::barrier<std::function<void()>> start{num_threads, {}};
std::barrier<std::function<void()>> end{num_threads, []{
           pool->finsish_flag.test_and_set(std::memory_order::release);
           pool->finsish_flag.notify_one();
}};

void ThreadPool::IndividualThread::main() {
    while (true) {
       start.wait();
       int i{0};
       while ((i = pool->idx.fetch_sub(1)) > -1) {
            (*(pool->task))(pool->ctx, i);
       }
       end.arrive_and_wait();
    }
}
void ThreadPool::QueueTask(ThreadPool::function* function, void* context,
                           int r) {
    task = function;
    ctx = context;
    idx.store(r);
    start.signal();
    finsish_flag.wait(false);
    finsish_flag.clear(std::memory_order::relaxed);
}

Of course, this code doesn't compile but hopefully shows the gist of it. Does somebody have an idea how std::barriers could be employed here?

Upvotes: 0

Views: 57

Answers (0)

Related Questions