Reputation: 1881
Can I use a std::barrier
in a simple threadpool to replace individual atomic flags?
Consider the simple threadpool below: Threads are signaled to start working through an individual std::atomic_flag
by the main thread. The loop for the individual thread is:
void ThreadPool::IndividualThread::main() {
while (true) {
flag.wait(false);
int i{0};
while ((i = pool->idx.fetch_sub(1)) > -1) {
(*(pool->task))(pool->ctx, i);
}
flag.clear(std::memory_order::relaxed);
if ((pool->completed_tasks.fetch_add(1) + 1) == pool->threads.size()) {
pool->finsish_flag.test_and_set(std::memory_order::release);
pool->finsish_flag.notify_one();
}
}
}
The main thread coordinates the work through the QueueTask
function as follows:
void ThreadPool::QueueTask(ThreadPool::function* function, void* context,
int r) {
task = function;
ctx = context;
completed_tasks.store(0);
idx.store(r);
for (ThreadStorage& thread : storage) {
thread.flag.test_and_set();
thread.flag.notify_one();
}
finsish_flag.wait(false);
completed_tasks.store(0);
finsish_flag.clear(std::memory_order::relaxed);
}
(I have omitted the termination condition for the individual threads and checks whether the number of threads is positive). The class declaration is:
class ThreadPool {
struct IndividualThread {
ThreadPool* pool{nullptr};
std::atomic_flag flag{false};
void main();
};
public:
using function = void(void* context, size_t idx);
explicit ThreadPool(size_t num_threads);
~ThreadPool();
void QueueTask(function*, void* context, int range);
private:
std::vector<ThreadStorage> storage;
std::vector<std::jthread> threads;
std::atomic<size_t> completed_tasks{0}; // wait on new jobs or termination
std::atomic<int> idx{0};
function* task{nullptr};
void* ctx{nullptr};
};
For some time, I've been wondering whether I can use ``std::barrier` as a coordination mechanism for the threadpool. Inspired by Anthony Williams's talks about the c++20 concurrency features (for example here slide 63), I am thinking of rewriting the two functions as follows:
std::barrier<std::function<void()>> start{num_threads, {}};
std::barrier<std::function<void()>> end{num_threads, []{
pool->finsish_flag.test_and_set(std::memory_order::release);
pool->finsish_flag.notify_one();
}};
void ThreadPool::IndividualThread::main() {
while (true) {
start.wait();
int i{0};
while ((i = pool->idx.fetch_sub(1)) > -1) {
(*(pool->task))(pool->ctx, i);
}
end.arrive_and_wait();
}
}
void ThreadPool::QueueTask(ThreadPool::function* function, void* context,
int r) {
task = function;
ctx = context;
idx.store(r);
start.signal();
finsish_flag.wait(false);
finsish_flag.clear(std::memory_order::relaxed);
}
Of course, this code doesn't compile but hopefully shows the gist of it. Does somebody have an idea how std::barriers could be employed here?
Upvotes: 0
Views: 57