Aditya Hegde
Aditya Hegde

Reputation: 61

Wait for the first future to complete in C++

Say I have two futures corresponding to two parallelly executed computation. How do I wait until the first future is ready? Ideally, I'm looking for an API similar to Python asyncio's wait with the parameter return_when=FIRST_COMPLETED.

However, as far as I can gather, the current C++ std::future functionality seems to not offer such an API. In that case, I'm looking for an efficient/correct method to poll for the future's status.

In case futures aren't the right tool for this task, I'm open to solutions using threads and conditional variables.

Upvotes: 3

Views: 2740

Answers (3)

LB--
LB--

Reputation: 2928

There is currently no standard library support to efficiently wait for the earliest readiness of multiple std::future instances. However, there are multiple ways you can implement such functionality manually without having to resort to inefficient polling in a loop.

The ideal solution is to refactor your concurrency code to include alternative ways of signalling result readiness in addition to the usual std::future so that you can have a shared signal endpoint that can be notified by multiple threads. I will share a C++20 approach using std::atomic (and optionally std::counting_semaphore) and a C++11 approach using std::call_once.

If you cannot refactor the code that provides the std::future instances, both of these approaches can also still be used. I'll discuss that further down in this answer, as well as provide example code for that situation.

The C++20 approach

In addition to each thread signaling its result through separate std::future instances, you can also have the threads all try to store into the same std::atomic instance upon completion, which the waiting thread can wait on and then load from to determine which std::future to check. This allows you to pick from two different possible behaviors:

  • If you specifically want only the fastest thread, you would have each thread use compare_exchange_strong with the expected value being the not-yet-ready sentinel value. When a losing thread tries to store the value, the compare_exchange_strong call will rightfully fail, and it can surmise it lost the race.
  • If you want to allow a slower thread to sneak in its result at the last moment between the waiting thread being notified and loading the value, then you would have the racing threads use an ordinary atomic store instead so they can overwrite each other's values. This might be useful behavior when a later thread can have a "better" value than an earlier thread, although it's only a micro-optimization compared to the first approach.

Regardless of which of these two approaches you use, you need to use notify_one or notify_all on the atomic after updating its value so that the original thread can be awoken from its wait call.

As for what template argument to use for std::atomic and what sentinel value to use, you actually have options there as well:

  • std::atomic<std::future<T>*> with a sentinel of nullptr allows you to directly get the pointer of the std::future instance that should be examined. This can be convenient when all the std::future instances are of the same type. You can also instead use a pointer to some wrapping structure that contains the std::future and associated materials. In fact, you don't even have to use std::future at all.
  • std::atomic_size_t with a sentinel of std::string::npos allows you to treat the result as an array index. This can be useful when you need to check on other materials associated with the winning thread when you're using parallel arrays.
  • std::atomic<SomeEnum> with a sentinel being one of the enumerations allows you to switch on the value to determine what to do. This can be useful when each std::future is of a different type and/or you know how many there will be at compile time.

Note however that wait on a std::atomic does not support providing a timeout - that functionality is coming in a future C++ standard. If you need to wait with a timeout, you can also throw in a std::counting_semaphore which is initialized to zero and released by each thread, so that the waiting thread can use try_acquire_for. Then it can load from the std::atomic to find out which std::future to examine. If you are using the compare_exchange_strong approach, you can get away with a std::binary_semaphore instead, since you can be assured only one thread will discover itself to be the victor.

Additionally, exercise caution with exceptions: if all of the threads fail by exception before signaling a result, you end up in an endless wait situation. One fix for this is to have a std::atomic counting the number of failed threads, and when it reaches the total thread count, take some appropriate action to allow the application to progress and handle the error state. One way to handle this is for that last thread to signal victory as normal and let the waiting thread receive its exception.

If you want to support cancelling the losing threads, that can be trivially done with use of std::jthread and std::stop_token. Just make sure the std::jthread destructors run before any of the other object destructors, and note that the std::jthread destructor (or the std::future destructor from std::async) will block until the losing thread gets to a stopping point and finishes calling all its own destructors. You might consider moving cleanup duties to a background thread.

If you're using a threadpool, you can directly use std::stop_source and std::stop_token without std::jthread. However, exercise caution: a race condition exists where a losing thread may have stored a value in its std::promise connected to your std::future, but not yet signaled the std::atomic or the optional semaphore. Therefore, calling wait on the std::future is not sufficient to delay destruction of the std::atomic and optional semaphore. You will need additional synchronization since you cannot use set_value_at_thread_exit in a threadpool. An easy option is to just host all the shared state in a std::shared_ptr so it automatically lives as long as necessary.

If you want to avoid the dynamic memory allocation associated with std::stop_source/std::stop_token, you can cheat by checking if the std::atomic no longer contains the sentinel value, or by using std::counting_semaphore::try_acquire followed by release if successful, since both of these are indications that another thread already won. In the case of std::counting_semaphore this also means you can wait on cancellation with a timeout, but this isn't particularly useful in most cases.

The C++11 approach

Each thread should receive a reference to a single std::once_flag and callback, both of which are shared by all the threads. Use of std::future is optional in this case, since the result can just be passed directly to the callback, which can itself use std::promise/std::future if desired. When a thread completes and is ready to transmit its result, it can then use std::call_once, which handles the race synchronization automatically and ensures only the first thread gets to invoke the callback. The callback can do whatever it wants, either utilizing the value immediately or forwarding it to a std::promise connected to a single std::future that your original thread can trivially wait on.

Note however that the losing threads will not be able to terminate until the winning thread's callback returns. It is also possible for the callback to complete before the losing threads attempt to call it, but in that case only the std::once_flag needs to outlive the losing threads, since callback cannot be invoked. Still, you should try to ensure the callback outlives the losing threads as well.

Additionally, exercise caution with exceptions: if the callback exits via exception, the std::once_flag will be unset and unlocked for a losing thread to try invoking the callback again, which may have unintended consequences. Worse, if all of the threads fail by exception before invoking the callback, you end up in an endless wait situation. One fix for this is to have a std::atomic counting the number of failed threads, and when it reaches the total thread count, take some appropriate action to allow the application to progress and handle the error state.

If you want to support cancelling the losing threads, a quick and dirty way is to use a single std::atomic_bool that all the threads can read from to check for cancellation, and the waiting thread can set its value to inform them of that. Remember you must explicitly call .join() on each std::thread instance (or the std::future returned from std::async might do that implicitly), which will block until the losing thread gets to a stopping point and finishes calling all its own destructors. You might consider moving cleanup duties to a background thread.

If you're using a threadpool, then you will need additional synchronization since the std::once_flag and callback need to live long enough for all the losing threads to be done with them. An easy option is to just host all the shared state in a std::shared_ptr so it automatically lives as long as necessary.

Be cautious with threadpools

If you're using a threadpool, remember that you might need to do something special to get it to create additional threads instead of just holding your work in a queue while earlier work executes. Forgetting this can result in one of the would-be-winners getting stuck in the queue and not executing while the slower jobs end up "winning" the race due to getting to go first. Check your threadpool library's documentation for information on how to do this. Sometimes you do this by informing it that the work can take a long time to run, other times you can indicate you really want all the work to start in parallel without being queued. In some cases it may be better to just create dedicated threads of your own so as not to upset the balance of a threadpool.

Adapting to code you can't modify

If you are working with an existing library or API that just gives you std::future instances with no way to add any form of callback to be executed on the threads as they finish, you will have to create your own threads and utilize one of the above approaches, with the "work" being to simply wait on their designated std::future instances from that other library. Doing so is wasteful in terms of memory usage and thread count (since every thread the library makes is mirrored by one of your own threads), but you may find it necessary for optimal performance or power efficiency compared to polling.

Aditya Hegde's answer is an example implementation of this concept that works in C++11, though it may be less optimal than my proposed C++11 approach on some platforms.

Because the "work" in this case is to just call wait on that other library's std::future instances, you might be able to get away with using my suggested approaches without worrying about the exception handling edge cases I pointed out previously, depending on the exception promises of wait in the standard library implementation you're using. You could therefore utilize my suggested approaches without needing to create any extra std::promise/std::future instances, since you'd just be working with the ones from that other library.

Demo

Here is an example C++20 program that solves your original problem:

#include <atomic>
#include <future>
#include <thread>
#include <vector>

namespace some_library
{
    class BackgroundProcessor
    {
        std::vector<std::jthread> threads;
    public:
        std::future<int> process(int param)
        {
            std::packaged_task<int (int)> task([](int param)
            -> int
            {
                std::this_thread::sleep_for(std::chrono::seconds(1) + std::chrono::milliseconds(param));
                return param;
            });
            std::future<int> f(task.get_future());
            threads.emplace_back(std::move(task), param);
            return f;
        }
    };
}

namespace your_code
{
    class BackgroundProcessorHelper
    {
        static constexpr std::size_t RACE_IN_PROGRESS{(std::size_t)-1};
        std::atomic_size_t victor{RACE_IN_PROGRESS};
        some_library::BackgroundProcessor processor;
        struct Waiter
        {
            std::future<int> f;
            std::jthread thread{};
        };
        std::vector<Waiter> waiters;
    public:
        int waitFirstOfThree(int paramA, int paramB, int paramC)
        {
            waiters.emplace_back(processor.process(paramA));
            waiters.emplace_back(processor.process(paramB));
            waiters.emplace_back(processor.process(paramC));
            for(std::size_t i{0}; i < std::size(waiters); ++i)
            {
                Waiter& waiter{waiters.at(i)};
                waiter.thread = std::jthread([&f = waiter.f, i, &victor = victor]
                {
                    f.wait();
                    std::size_t expected{RACE_IN_PROGRESS};
                    if(victor.compare_exchange_strong(expected, i))
                    {
                        victor.notify_all();
                    }
                });
            }
            victor.wait(RACE_IN_PROGRESS);
            std::size_t const victorIndex{victor.load()};
            return waiters.at(victorIndex).f.get();
        }
    };
}

int main()
{
    your_code::BackgroundProcessorHelper helper;
    return helper.waitFirstOfThree(65, 21, 43);
}

Demo: https://compiler-explorer.com/z/6a7cY3PGr

Cancelling the wait

Unfortunately, everything discussed so far in this answer is just a workaround for a lack of a proper way to wait on the first of multiple events in C++. Therefore, there is currently no efficient way to simultaneously wait on another library's std::future and your own cancellation source. You will have to use wait_for in a loop to wait with a timeout based on how responsive you want cancellation to be, at the cost of power efficiency. If you're lucky, the library you're using may have its own cancellation mechanism, saving you from this hassle. If you want a real proper solution though, you'll have to do it yourself, either by modifying the library you are using or dropping it and rolling your own.

When you have full control over the entire program, you can use tricks like the C++20 std::atomic functionality to wait for the value to change and to notify waiters. Therefore, you could have a std::atomic be used to both signal a successful wait or a cancelled wait, and its value can allow you to act accordingly. However, this means you can't use std::future at all, you'll have to re-invent it yourself. You also can't wait with a timeout until a later C++ standard, unless you re-invent that too by having another thread sleep and cancel the wait, but that's difficult to do correctly. I'd advise against designs that require wait timeouts anyway.

If you can't use C++20, some operating systems have built-in support for waiting for the first victor among multiple threads or future-like things in optimal ways, such as Windows' WaitForMultipleObjects family of APIs. Using operating system APIs might be slower than the C++20 approach on some architectures depending on your standard library implementation and target use cases, but it will allow you to correctly wait for both a desired event and a cancellation event in an optimal way. For Windows, that typically means creating event objects that can be signaled by the threads and waited upon by the WaitForMultipleObjects family of APIs. Since you get information about which event caused the wait to end, you can respond in the correct way, and you don't need to use any timeouts if you don't want to. Be wary however, these Windows APIs have a rather low limit on the number of things that can be waited upon in a single call, so you might need to break up the list into separate chunks that are sent to separate threads to wait on, and then wait on those results recursively.

Really though, you should probably just ask the library authors to implement the ability to cancel the operations, so you can use the original example code from earlier in this answer and save yourself a lot of trouble.

Upvotes: 2

Aditya Hegde
Aditya Hegde

Reputation: 61

I've tried to come up with a function that takes two futures f1 and f2 and returns a new future which is resolved to either f1 or f2 depending on which completes first.

The function assumes that the get() method is not called on f1 and f2 anywhere (else we'll need to use std::shared_future).

#include <unistd.h>

#include <array>
#include <future>
#include <iostream>
#include <thread>

template <typename T>
class WaitFirstData {
 public:
  T res;
  bool res_ready;
  std::condition_variable res_cv;
  std::mutex res_mutex;

  WaitFirstData() : res_ready{false} {}
};

template <typename T>
std::future<T> wait_first(std::future<T> f1, std::future<T> f2) {
  auto data = std::make_shared<WaitFirstData<T>>();

  std::promise<T> wait_first_promise;
  std::future<T> wait_first_future = wait_first_promise.get_future();

  std::thread wait_first_thread{
      [](std::promise<T> p, std::shared_ptr<WaitFirstData<T>> data) {
        std::unique_lock<std::mutex> lk(data->res_mutex);
        data->res_cv.wait(lk, [&]() { return data->res_ready; });

        p.set_value(data->res);
      },
      std::move(wait_first_promise), data};

  auto thread_lambda = [](std::future<T> f,
                          std::shared_ptr<WaitFirstData<T>> data) {
    T r = f.get();

    std::lock_guard<std::mutex> lk(data->res_mutex);
    if (!data->res_ready) {
      data->res_ready = true;
      data->res = r;
      data->res_cv.notify_one();
    }
  };

  std::thread t1{thread_lambda, std::move(f1), data},
      t2{thread_lambda, std::move(f2), data};

  wait_first_thread.detach();
  t1.detach();
  t2.detach();

  return wait_first_future;
}

// arbitrary function denoting a potentially long-running task
int func(int n) {
  sleep(n);
  return n;
}

int main() {
  auto f1 = std::async(std::launch::async, func, 2);
  auto f2 = std::async(std::launch::async, func, 15);

  // f3 is resolved as soon as either f1 or f2 completes (with the same return
  // value too)
  auto f3 = wait_first(std::move(f1), std::move(f2));

  // wait for f3 to resolve
  int l = f3.get();

  // computation (here I/O) done after the first among f1 and f2 is completed
  std::cout << "Task done after " << l << " seconds" << std::endl;

  return 0;
}

Upvotes: 0

Alex Guteniev
Alex Guteniev

Reputation: 13634

You can use std::experimental::future method then if your computation is sequential (one depends on other), if you have access to std::experimental::future (and not afraid of using "experimental")

You can also pass one future as parameter to second computation right away. future::get is a way to wait for future to complete. Use std::shared_future, if you need to pass the first future to somewhere else.

Upvotes: 1

Related Questions