eparra
eparra

Reputation: 28

How to synchronize instances of a function running on different threads (in c++11)?

Let's say there's a number of threads that consist of a loop running instances of the same function, but the start of every iteration needs to be synchronized (so the threads that finish first have to wait for the last one to begin a new iteration). How can this be done in c++11?

...

The rest of the post is just what I've tried and how it fails.

I'm using a counter, "sync", initially set to 3 (the number of threads). Every thread, at the end of the function will subtract 1 from this counter and start waiting. When the counter reaches 0, that means that the 3 of them have finished one round, so the main thread will reset the counter to 3 and notify the threads to wake them up.

This works most of the time but sometimes one or two of the threads fail to wake up.

So these are the global variables:

mutex syncMutex;
condition_variable syncCV;
int sync;

This is at the end of the function that runs in a loop in the threads:

unique_lock<mutex> lk(syncMutex);
cout << "Thread num: " << mFieldNum << " got sync value: " << sync;
sync --;
syncCV.notify_all();
cout << " and goes to sleep..." << endl;
syncCV.wait(lk, []{return sync == numFields;});
cout << "Thread num: " << mFieldNum << " woke up" << endl;
}

And this runs in a loop in the main thread:

unique_lock<mutex> lk(syncMutex);
syncCV.wait(lk, []{return sync == 0;});
sync = 3;
lk.unlock();
cout << "Notifying all threads!" << endl;
syncCV.notify_all();

This is the output it produces when it fails (thread #3 doesn't wake up):

Thread num: 1 got sync value: 3 and goes to sleep...
Thread num: 2 got sync value: 2 and goes to sleep...
Thread num: 3 got sync value: 1 and goes to sleep...
Notifying all threads!
Thread num: 1 woke up
Thread num: 2 woke up
Thread num: 3 woke up
Thread num: 2 got sync value: 3 and goes to sleep...
Thread num: 1 got sync value: 2 and goes to sleep...
Thread num: 3 got sync value: 1 and goes to sleep...
Notifying all threads!
Thread num: 2 woke up
Thread num: 1 woke up
Thread num: 2 got sync value: 3 and goes to sleep...
Thread num: 1 got sync value: 2 and goes to sleep...

Does anyone have a clue? Thank you for reading.

Upvotes: 1

Views: 1371

Answers (1)

K. Frank
K. Frank

Reputation: 38

There are a number of issues with your thread synchronization. Tony has mentioned one in his comment. You also have a potential race condition in your main loop code where you call lk.unlock() before calling syncCV.notify_all(). (This could permit a thread to miss the notify_all signal.)

I would adjust your code in two ways. First, to address the use of "sync == numFields" as your condition, which, as Tony noted, can fail to be true after another thread has executed sync--, it makes sense to use as your condition that each thread run only once per main-thread loop. In my example code, this is achieved by introducing the "done[numFields]" variables. Second, it makes sense to introduce two condition variables -- one to signal the worker threads that a new main-loop iteration has started, and a second to signal the main thread that the worker threads are done. (Notice that the two condition variables use the same mutex.)

Here is a complete program, modelled on your sample code, that incorporates these two approaches:

#include <iostream>
using std::cout;
using std::endl;

#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

std::mutex syncMutex;
std::condition_variable readyCV;
std::condition_variable doneCV;
int sync;
bool exitFlag;

const int numFields = 5;
bool done[numFields];

const int nloops = 10;

void thread_func(int i) {
  int mFieldNum = i;
  while (true) {
    std::unique_lock<std::mutex> lk(syncMutex);
    readyCV.wait(lk, [mFieldNum]{return  exitFlag || !done[mFieldNum-1];});
    if (exitFlag)  break;
    cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << sync;
    if (--sync == 0)  doneCV.notify_all();
    done[mFieldNum-1] = true;
    readyCV.notify_all();
    cout << " and goes to sleep..." << endl;
  }
}

int main (int argc, char* argv[]) {
  exitFlag = false;
  sync = 0;
  std::vector<std::thread> threads;
  for (int i = 0; i < numFields; i++) {
    done[i] = true;
    threads.emplace_back (thread_func, i+1);
  }
  for (int i = 0; i <= nloops; i++) {
    std::unique_lock<std::mutex> lk(syncMutex);
    doneCV.wait(lk, []{return sync == 0;});
    cout << "main loop (lk held), i = " << i << endl;
    sync = numFields;
    if (i == nloops)  exitFlag = true;
    else              for (auto &b : done)  b = false;
    cout << "Notifying all threads!" << endl;
    readyCV.notify_all();
  }

  for (auto& t : threads)  t.join();
}

(I've also added an exitFlag and std::thread::join()'s so the program can clean up and terminate nicely.)

This is very similar to a classic producer-consumer implementation (one producer, numFields consumers), with the added constraint that each consumer thread will run only once per producer thread loop.

You can also achieve essentially the same program logic more simply if you are willing to forgo reusing the worker threads. (In your sample code and my above example, they are acting as a sort of specialized thread pool.) In my next example, new threads are created for each iteration of the main loop. This makes the thread synchronization simpler and eliminates the condition variables.

#include <iostream>
using std::cout;
using std::endl;

#include <atomic>
#include <mutex>
#include <thread>
#include <vector>

std::mutex coutMutex;

std::atomic<int> sync;

const int numFields = 5;
bool done[numFields];

const int nloops = 10;

void thread_func(int i) {
  int mFieldNum = i;
  int mySync = sync--;
  {
    std::lock_guard<std::mutex> lk(coutMutex);
    cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << mySync << endl;
  }
}  

int main (int argc, char* argv[]) {
  for (int i = 0; i < nloops; i++) {
    cout << "main loop, i = " << i << endl;
    std::vector<std::thread> threads;
    sync = numFields;
    for (int i = 0; i < numFields; i++)  threads.emplace_back (thread_func, i+1);
    for (auto& t : threads)  t.join();
  }
}

(coutMutex is a nicety so that the console output doesn't get garbled, but it is not necessary for the core synchronization logic.)

If in your real-world use case you don't need the thread_func to stay alive from iteration to iteration (for example, to preserve some state), and if each call to thread_func does enough work that the cost of creating a new thread to run it doesn't really matter in comparison, then creating new threads for each main-loop iteration (instead of reusing threads) is straightforward, sensible, and simpler.

Happy Multi-Threaded Hacking!

K. Frank

Upvotes: 1

Related Questions