MZHm
MZHm

Reputation: 2548

C++11 joining the first thread that finished

Is there a way to start two (or more) C++11 threads and join() the first one that is finished?

An example scenario:

#include <iostream>
#include <thread>

using namespace std;    
void prepare_item1() {std::cout << "Preparing 1" << std::endl;}    
void consume_item1() {std::cout << "Consuming 1" << std::endl;}    
void prepare_item2() {std::cout << "Preparing 2" << std::endl;}    
void consume_item2() {std::cout << "Consuming 2" << std::endl;}

int main()
{
    std::thread t1(prepare_item1);
    std::thread t2(prepare_item2);

    t1.join();
    consume_item1();

    t2.join();
    consume_item2();

    return 0;
}

I would have liked to do something like that instead:

int main()
{
    std::thread t1(prepare_item1);
    std::thread t2(prepare_item2);    

    finished_id=join_any(t1,t2)
    if (finished_id==1)
    {
        consume_item1();
        ...
    }
    else if (finished_id==2)
    {
        consume_item2();
        ...
    }

    return 0;
}

Also, I would like the solution to be blocking, similar to the t.join() function.

Note: The real reason I need this is that I have two different blocking functions from which I receive commands, and whenever any of them is ready I would like to process the first command that arrives and continue to the next one when it is done. (sequential processing of commands from two parallel sources)

Thanks!

Upvotes: 2

Views: 394

Answers (1)

Yakk - Adam Nevraumont
Yakk - Adam Nevraumont

Reputation: 275966

Here is a thread-safe multi-producer multi-consumer queue:

template<class T>
struct safe_queue {
  std::deque<T> data;
  std::atomic<bool> abort_flag = false;
  std::mutex guard;
  std::condition_variable signal;

  template<class...Args>
  void send( Args&&...args ) {
    {
      std::unique_lock<std::mutex> l(guard);
      data.emplace_back(std::forward<Args>(args)...);
    }
    signal.notify_one();
  }
  void abort() {
    abort_flag = true;   // 1a
    { std::unique_lock<std::mutex>{guard}; }
    signal.notify_all(); // 1b
  }        
  std::experimental::optional<T> get() {
    std::unique_lock<std::mutex> l(guard);
    signal.wait( l, [this]()->bool{ // 2b
      return !data.empty() || abort_flag.load(); // 2c
    });
    if (abort_flag.load()) return {};
    T retval = std::move(data.front());
    data.pop_front();
    return retval;
  }
};

have the threads shove data into the queue, and the main thread do a .get() on it.

If abort() is called, all waiting threads are woken up with an "empty" value from .get().

It uses std::experimental::optional, but you can replace that with something else (throw on abort? Whatever).

Code modified slightly from this other answer. Note that I think the other answer has some errors in it, which I corrected above, and attempts to solve a different problem.

The message you send could be the id of the thread that is ready to be waited upon, for example, or the work it has completed, or whatever.

Upvotes: 2

Related Questions