fabian
fabian

Reputation: 1881

How can I syncronize these two threads properly?

I would like to synchronize different threads properly but so far I have only be able to write an inelegant solution. Can somebody kindly point out how I can improve the following code?

typedef void (*func)();

void thread(func func1, func func2, int& has_finished, int& id) {
    has_finished--;
    func1();
    has_finished++;
    while (has_finished != 0) std::cout << "thread " << id << " waiting\n";
    std::cout << "thread" << id << "resuming\n";
    func2();
}

int main() {
    int has_finished(0), id_one(0), id_two(1);
    std::thread t1(thread, fun, fun, std::ref(has_finished), std::ref(id_one));
    std::thread t2(thread, fun, fun, std::ref(has_finished), std::ref(id_two));
    t1.join();
    t2.join();
};

The gist of the program is described by the function thread. The function is executed by two std::threads. The function accepts two long-running functions func1 and func2 and two references of ints as arguments. The threads should only invoke func2 after all threads exited func1. The argument has_finished is used to coordinate the different threads: Upon entering the function, has_arguments is zero. Then each std::thread decrements the value and invokes the long-running function func1. After having left func1, has_finished is incremented again. As long as this value is not at its original value of zero a thread waits. Then, each thread works on func2. The main function is shown at the end.

How can I coordinate the two threads better? I was thinking of using a std::mutex and std::condition_variable but could not figure out how to use them properly? Does somebody have any idea how I can improve the program?

Upvotes: 0

Views: 427

Answers (2)

Omnifarious
Omnifarious

Reputation: 56108

The method you've chosen won't actually work and results in undefined behavior because of the race conditions. As you surmised, you need a condition variable.

Here is a Gate class demonstrating how to use a condition variable to implement a gate that waits for some number of threads to arrive at it before continuing:

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <sstream>
#include <utility>
#include <cassert>

struct Gate {
 public:
    explicit Gate(unsigned int count = 2) : count_(count) { }  // How many threads need to reach the gate before it unlocks
    Gate(Gate const &) = delete;
    void operator =(Gate const &) = delete;

    void wait_for_gate();

 private:
    int count_;
    ::std::mutex count_mutex_;
    ::std::condition_variable count_gate_;
};

void Gate::wait_for_gate()
{
    ::std::unique_lock<::std::mutex> guard(count_mutex_);
    assert(count > 0); // Count being 0 here indicates an irrecoverable programming error.
    --count_;
    count_gate_.wait(guard, [this](){ return this-> count_ <= 0; });
    guard.unlock();
    count_gate_.notify_all();
}

void f1()
{
    ::std::ostringstream msg;
    msg << "In f1 with thread " << ::std::this_thread::get_id() << '\n';
    ::std::cout << msg.str();
}

void f2()
{
    ::std::ostringstream msg;
    msg << "In f2 with thread " << ::std::this_thread::get_id() << '\n';
    ::std::cout << msg.str();
}

void thread_func(Gate &gate)
{
    f1();
    gate.wait_for_gate();
    f2();
}

int main()
{
    Gate gate;
    ::std::thread t1{thread_func, ::std::ref(gate)};
    ::std::thread t2{thread_func, ::std::ref(gate)};
    t1.join();
    t2.join();
}

Hopefully the structure of this code looks enough like your code that you can understand what's going on here. From reading your code, it seems like you're looking for all threads to execute func1, then func2. You do not want func2 running while any thread is executing func1.

That can be thought of as a gate where all the threads are waiting to arrive at the 'finished func1' location before moving on to run func2.

I tested this code on my own local version of compiler explorer.

The main disadvantage of the latch in the other answer is that it is not yet standard C++. My Gate class is a simple implementation of the latch class mentioned in the other answer, and it is standard C++.

The basic way a condition variable works is that it unlocks a mutex, waits for a notify, then locks that mutex and tests the condition. If the condition is true, it continues without unlocking the mutex. If the condition is false, it starts over again.

So, after the condition variable says the condition is true, you have to do whatever you need to do, then unlock the mutex and notify everybody that you've done it.

The mutex here is guarding the shared count variable. Whenever you have a shared value you should guard it with a mutex so that no thread can see that value in an inconsistent state. The condition is that threads can wait for that count to reach 0, indicating that all threads have decremented the count variable.

Upvotes: 2

Kerrek SB
Kerrek SB

Reputation: 477640

Don't write this yourself. This kind of synchronization is known as a "latch" (or more generally a "barrier", and it's available through various libraries and through the C++ Concurrency TS. (It might also make it into C++20 in some form.)

For example, using a version from Boost:

#include <iostream>
#include <thread>

#include <boost/thread/latch.hpp>

void f(boost::latch& c) {
    std::cout << "Doing work in round 1\n";
    c.count_down_and_wait();
    std::cout << "Doing work in round 2\n";
}

int main() {
    boost::latch c(2);

    std::thread t1(f, std::ref(c)), t2(f, std::ref(c));
    t1.join();
    t2.join();
}

Upvotes: 5

Related Questions