Reputation: 3778
I have three worker threads that I wish to coordinate from main() using condition variables in C++ 11. The code below illustrates my approach:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
const int T_COUNT = 3;
thread* Threads[T_COUNT];
condition_variable cv[T_COUNT];
mutex m[T_COUNT];
bool Ready[T_COUNT] = {0};
bool Running[T_COUNT] = {0};
void worker(int tid) {
while (1) {
unique_lock<mutex> lk(m[tid]);
cv[tid].wait(lk, [&]{return Ready[tid];});
cout << "Processing thread #" << tid << endl;
Ready[tid] = false;
lk.unlock();
cv[tid].notify_all();
}
}
int main() {
int tid = 0;
while (1) {
if (Running[tid]) {
unique_lock<mutex> lk(m[tid]);
cv[tid].wait(lk, [&]{return !Ready[tid];});
Ready[tid] = true;
lk.unlock();
cv[tid].notify_all();
} else {
cout << "Creating thread #" << tid << endl;
Threads[tid] = new thread([&]{
worker(tid);
});
Running[tid] = true;
cv[tid].notify_all();
}
tid = (tid + 1) % T_COUNT;
}
}
I want this code to produce output like the following:
...
Processing thread #0
Processing thread #1
Processing thread #2
Processing thread #0
Processing thread #1
Processing thread #2
...
It doesn't matter when a thread's loop finishes an iteration, it only matters that each thread iteration begins in sequence with the other threads. To illustrate visually what I'm aiming for:
|----T0----|
|----T1----|
|----T2----|
|----T0----|
|----T1----|
|----T2----|
The code above does not do this, and I've failed to figure out why on my own. Any help is greatly appreciated!
Upvotes: 1
Views: 1214
Reputation: 921
For threads in a program to communicate with each other and synchronize access to shared resources.
Synchronization Primitives
Special std::atomic: Mutex protected scalar variable.
Not to be used by itself alone, but with std::mutex.
NOTE: Semaphore and Mailbox can be implemented using Mutex and Condition Variable.
A semaphore contains a count indicating whether a resource is locked or available. Semaphore is signaling mechanism (“I am done, you can carry on.”). The resource itself may not be thread safe.
Producer
semObject.Post(); // Send the signal
Increase the semaphore count by 1. If a thread is waiting on the specified semaphore, it is awakened.[1]
Consumer
semObject.Wait(); // Wait for the signal
When the semaphore count is zero, the thread calling this function will wait for the semaphore. When the semaphore count is nonzero, the count will be decremented by 1 and the thread calling this function will continue.[1]
void Semaphore::Post()
{
{
lock_guard<mutex> lockGuard(m_mtx);
++m_count;
} // release lock
m_cv.notify_one(); // Since the count increments only 1, notifying one is enough. This call does not need to be locked.
}
void Semaphore::Wait(const bool resetCount /*= false*/)
{
unique_lock<mutex> mtxLock(m_mtx); // Must use unique_lock with condition variable.
m_cv.wait(mtxLock, [this]{ return m_count > 0; }); // Blocking till count is not zero.
if (resetCount)
{
m_count = 0; // Reset the signal.
}
else
{
--m_count; // Normal wait.
}
}
#include "Semaphore.h"
using namespace std;
const int T_COUNT = 3;
static Semaphore sem[T_COUNT];
void Worker(int tid)
{
for (;;)
{
sem[tid].Wait(); // Wait for signal, blocking.
cout << "Processing thread #" << tid << endl;
sem[(tid + 1) % T_COUNT].Post(); // Inform next worker to run.
this_thread::sleep_for(chrono::seconds(tid + 1)); // Actual timing consuming work.
}
}
int main()
{
// Create all threads first.
thread th[T_COUNT];
for (int i = 0; i < T_COUNT; ++i)
{
th[i] = thread(Worker, i);
}
// Start to run by signaling the first worker.
sem[0].Post();
// End of work.
for (int i = 0; i < T_COUNT; ++i)
{
th[i].join();
}
}
[1] Massa, Anthony J., Embedded software development with eCos, Pearson Education, Inc., 2002
Upvotes: 3