Adam
Adam

Reputation: 3778

C++ 11 multi-threading with condition variables

I have three worker threads that I wish to coordinate from main() using condition variables in C++ 11. The code below illustrates my approach:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

using namespace std;

const int T_COUNT = 3;
thread* Threads[T_COUNT];
condition_variable cv[T_COUNT];
mutex m[T_COUNT];
bool Ready[T_COUNT] = {0};
bool Running[T_COUNT] = {0};

void worker(int tid) {
    while (1) {
        unique_lock<mutex> lk(m[tid]);
        cv[tid].wait(lk, [&]{return Ready[tid];});

        cout << "Processing thread #" << tid << endl;

        Ready[tid] = false;
        lk.unlock();
        cv[tid].notify_all();
    }
}

int main() {
    int tid = 0;
    while (1) {
        if (Running[tid]) {
            unique_lock<mutex> lk(m[tid]);
            cv[tid].wait(lk, [&]{return !Ready[tid];});
            Ready[tid] = true;
            lk.unlock();
            cv[tid].notify_all();
        } else {
            cout << "Creating thread #" << tid << endl;
            Threads[tid] = new thread([&]{
                worker(tid);
            });
            Running[tid] = true;
            cv[tid].notify_all();
        }
        tid = (tid + 1) % T_COUNT;
    }
}

I want this code to produce output like the following:

...
Processing thread #0
Processing thread #1
Processing thread #2
Processing thread #0
Processing thread #1
Processing thread #2
...

It doesn't matter when a thread's loop finishes an iteration, it only matters that each thread iteration begins in sequence with the other threads. To illustrate visually what I'm aiming for:

|----T0----|
 |----T1----|
     |----T2----|
            |----T0----|
               |----T1----|
                 |----T2----|

The code above does not do this, and I've failed to figure out why on my own. Any help is greatly appreciated!

Upvotes: 1

Views: 1214

Answers (1)

Garland
Garland

Reputation: 921

Thread Synchronization Mechanisms

For threads in a program to communicate with each other and synchronize access to shared resources.

Synchronization Primitives

  • Mutex (C++11)

Special std::atomic: Mutex protected scalar variable.

  • Semaphore
  • Condition Variable (C++11)

Not to be used by itself alone, but with std::mutex.

  • Event Flag (Each bit in the flag represents a condition)
  • Message Box (Mailbox)

NOTE: Semaphore and Mailbox can be implemented using Mutex and Condition Variable.

Semaphore – to solve Producer/Consumer problem

A semaphore contains a count indicating whether a resource is locked or available. Semaphore is signaling mechanism (“I am done, you can carry on.”). The resource itself may not be thread safe.

Producer

semObject.Post();   // Send the signal

Increase the semaphore count by 1. If a thread is waiting on the specified semaphore, it is awakened.[1]

Consumer

semObject.Wait();   // Wait for the signal

When the semaphore count is zero, the thread calling this function will wait for the semaphore. When the semaphore count is nonzero, the count will be decremented by 1 and the thread calling this function will continue.[1]

Semaphore can be implemented using Mutex and Condition Variable

void Semaphore::Post()
{
    {
        lock_guard<mutex> lockGuard(m_mtx);
        ++m_count;
    }   // release lock
    m_cv.notify_one();  // Since the count increments only 1, notifying one is enough. This call does not need to be locked.
}

void Semaphore::Wait(const bool resetCount /*= false*/)
{
    unique_lock<mutex> mtxLock(m_mtx);      // Must use unique_lock with condition variable.
    m_cv.wait(mtxLock, [this]{ return m_count > 0; });      // Blocking till count is not zero.
    if (resetCount)
    {
        m_count = 0;    // Reset the signal.
    }
    else
    {
        --m_count;      // Normal wait.
    }
}

Use Semaphore to solve your problem

#include "Semaphore.h"

using namespace std;

const int T_COUNT = 3;
static Semaphore sem[T_COUNT];

void Worker(int tid) 
{
    for (;;)
    {
        sem[tid].Wait();    // Wait for signal, blocking.
        cout << "Processing thread #" << tid << endl;
        sem[(tid + 1) % T_COUNT].Post();    // Inform next worker to run.
        this_thread::sleep_for(chrono::seconds(tid + 1));   // Actual timing consuming work.
    }
}

int main()
{
    // Create all threads first.
    thread th[T_COUNT];
    for (int i = 0; i < T_COUNT; ++i)
    {
        th[i] = thread(Worker, i);
    }

    // Start to run by signaling the first worker.
    sem[0].Post();

    // End of work.
    for (int i = 0; i < T_COUNT; ++i)
    {
        th[i].join();
    }
}

Reference

[1] Massa, Anthony J., Embedded software development with eCos, Pearson Education, Inc., 2002

Upvotes: 3

Related Questions