Reputation: 235
Trying to make a simple multithreading example using for loops. I'm trying to make them loop in block like this:
Thread 1 printing 0
Thread 2 printing 0
Thread 3 printing 0
Thread 4 printing 0
Thread 1 printing 1
Thread 2 printing 1
Thread 3 printing 1
Thread 4 printing 1
This means: they all print "1", then they all wait for everyone to have done that, then they all print "2", wait again for everyone, print "3", etc.
So I wrote this code:
#include <iostream>
#include <thread>
#include <string.h>
using namespace std;
bool flags[4] = {true,true,true,true};
bool checkAll(){
bool res = false;
for(int i=0;i<4;i++){
res = res|flags[i];
}
return res;
}
void printer(int id){
for(int i=0;i<100;i++){
flags[id] = true;
cout << "Thread: " << id << " printing " << i << endl;
flags[id] = false;
while(checkAll()) {}
}
}
int main(int argc, char *argv[]){
thread t1(printer,0);
thread t2(printer,1);
thread t3(printer,2);
thread t4(printer,3);
t4.join();
t3.join();
t2.join();
t1.join();
return 0;
}
But it doesn't work as expected. As far as I know, it does not work due to concurrency problems (several threads reading/writing the same variable).
So, I tried to solve it using condition variables:
#include <iostream>
#include <thread>
#include <string.h>
#include <mutex>
#include <condition_variable>
using namespace std;
bool flags[4] = {true,true,true,true};
mutex m;
condition_variable g_queuecheck;
bool done = false;
bool checkAll(){
bool res = false;
for(int i=0;i<4;i++){
res = res|flags[i];
}
return res;
}
void printer(int id){
unique_lock<mutex> locker(m);
for(int i=0;i<100;i++){
flags[id] = true;
cout << "Thread: " << id << " printing " << i << endl;
flags[id] = false;
g_queuecheck.wait(locker);
}
}
void controller(){
while(!done){
if(!checkAll()){
g_queuecheck.notify_all();
}
}
}
int main(int argc, char *argv[]){
thread t0(controller);
thread t1(printer,0);
thread t2(printer,1);
thread t3(printer,2);
thread t4(printer,3);
t4.join();
t3.join();
t2.join();
t1.join();
done = true;
t0.join();
return 0;
}
But doesn't work either. So, here come my questions: Is there a possible way of doing it simple like in the first code? If not, what am I doing wrong in the second one? Thanks a lot.
Upvotes: 0
Views: 654
Reputation: 2011
Your examples do not work, because there are race conditions in updating and checking the flags array.
Seems like what you want though is a well known primitive called a barrier. This can be implemented, for example, using semaphores. See section 3.6 of The Little Book of Semaphores for details on how this works.
With a barrier your code can be written concisely as:
const int nThreads = 4;
const int nIter = 100;
mutex m;
barrier barrier(nThreads);
void printer(int id) {
for (int i = 0; i < nIter; i++) {
{
lock_guard<mutex> lock(m); // lock to prevent interleaved console output
cout << "Thread: " << id << " printing " << i << endl;
}
barrier.wait();
}
}
int main(int argc, char **argv) {
vector<thread> ts;
for (int i = 0; i < nThreads; i++) {
ts.emplace_back(thread(printer, i));
}
for (int i = 0; i < nThreads; i++) {
ts[i].join();
}
return 0;
}
Below is a simple semaphore implementation (copied from here).
class semaphore {
private:
mutex mtx;
condition_variable cv;
int count;
public:
semaphore(int count_ = 0):count(count_){;}
void notify()
{
unique_lock<mutex> lck(mtx);
++count;
cv.notify_one();
}
void wait()
{
unique_lock<mutex> lck(mtx);
while(count == 0){
cv.wait(lck);
}
count--;
}
};
Using that, you can implement a barrier as in the referenced book:
class barrier {
public:
barrier(int n): n(n), count(0) {}
void wait() {
phase1();
phase2();
}
private:
mutex m;
semaphore turnstile1, turnstile2;
int n, count;
void phase1() {
m.lock();
count++;
if (count == n) {
for (int i = 0; i < n; i++)
turnstile1.notify();
}
m.unlock();
turnstile1.wait();
}
void phase2() {
m.lock();
count--;
if (count == 0) {
for (int i = 0; i < n; i++)
turnstile2.notify();
}
m.unlock();
turnstile2.wait();
}
};
Upvotes: 2
Reputation: 4925
Well, this does what you want. It worked without the atomic
but I figured what the heck, throw it in anyway. :)
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>
const size_t num_threads = 10;
const size_t num_reps = 10;
std::mutex m;
std::atomic_int pos;
void printer(int id)
{
for(int i = 0; i < num_reps; ++i)
{
std::unique_lock<std::mutex> l(m);
if(pos.load() == id)
{
std::cout << "Thread: " << id << " printing " << i << std::endl;
pos.exchange((pos.load() + 1) % num_threads);
}
}
}
int main()
{
m.lock();
pos.store(0);
std::vector<std::thread> v;
for(int i = 0; i < num_threads; ++i)
{
v.emplace_back(std::thread(printer, i));
}
m.unlock();
bool done;
do
{
done = true;
for(int i = 0; i < num_threads; ++i)
{
if(v[i].joinable())
{
done = false;
v[i].join();
}
}
}
while(!done);
return 0;
}
Upvotes: 0