aram
aram

Reputation: 1444

Correct way to wait a condition variable that is notified by several threads

I'm trying to do this with the C++11 concurrency support.

I have a sort of thread pool of worker threads that all do the same thing, where a master thread has an array of condition variables (one for each thread, they need to 'start' synchronized, ie not run ahead one cycle of their loop).

    for (auto &worker_cond : cond_arr) {
        worker_cond.notify_one();
    }

then this thread has to wait for a notification of each thread of the pool to restart its cycle again. Whats the correct way of doing this? Have a single condition variable and wait on some integer each thread that isn't the master is going to increase? something like (still in the master thread)

    unique_lock<std::mutex> lock(workers_mtx);
    workers_finished.wait(lock, [&workers] { return workers = cond_arr.size(); });

Upvotes: 1

Views: 2096

Answers (2)

Yakk - Adam Nevraumont
Yakk - Adam Nevraumont

Reputation: 275250

I see nothing fundamentally wrong with your solution.

Guard workers with workers_mtx and done.

We could abstract this with a counting semaphore.

struct counting_semaphore {
  std::unique_ptr<std::mutex> m=std::make_unique<std::mutex>();
  std::ptrdiff_t count = 0;
  std::unique_ptr<std::condition_variable> cv=std::make_unique<std::condition_variable>();

  counting_semaphore( std::ptrdiff_t c=0 ):count(c) {}
  counting_semaphore(counting_semaphore&&)=default;

  void take(std::size_t n = 1) {
    std::unique_lock<std::mutex> lock(*m);
    cv->wait(lock, [&]{ if (count-std::ptrdiff_t(n) < 0) return false; count-=n; return true; } );
  }
  void give(std::size_t n = 1) {
    {
      std::unique_lock<std::mutex> lock(*m);
      count += n;
      if (count <= 0) return;
    }
    cv->notify_all();
  }
};

take takes count away, and blocks if there is not enough.

give adds to count, and notifies if there is a positive amount.

Now the worker threads ferry tokens between two semaphores.

std::vector< counting_semaphore > m_worker_start{count};
counting_semaphore m_worker_done{0}; // not count, zero
std::atomic<bool> m_shutdown = false;

// master controller:
for (each step) {
  for (auto&& starts:m_worker_start)
    starts.give();
  m_worker_done.take(count);
}

// master shutdown:
m_shutdown = true;
// wake up forever:
for (auto&& starts:m_worker_start)
  starts.give(std::size_t(-1)/2);

// worker thread:
while (true) {
  master->m_worker_start[my_id].take();
  if (master->m_shutdown) return;
  // do work
  master->m_worker_done.give();
}

or somesuch.

live example.

Upvotes: 1

Anedar
Anedar

Reputation: 4265

I see two options here:

Option 1: join()

Basically instead of using a condition variable to start the calculations in your threads, you spawn a new thread for every iteration and use join() to wait for it to be finished. Then you spawn new threads for the next iteration and so on.

Option 2: locks

You don't want the main-thread to notify as long as one of the threads is still working. So each thread gets its own lock, which it locks before doing the calculations and unlocks afterwards. Your main-thread locks all of them before calling the notify() and unlocks them afterwards.

Upvotes: 1

Related Questions