Reputation: 61
Say I have two futures corresponding to two parallelly executed computation. How do I wait until the first future is ready? Ideally, I'm looking for an API similar to Python asyncio's wait with the parameter return_when=FIRST_COMPLETED
.
However, as far as I can gather, the current C++ std::future
functionality seems to not offer such an API. In that case, I'm looking for an efficient/correct method to poll for the future's status.
In case futures aren't the right tool for this task, I'm open to solutions using threads and conditional variables.
Upvotes: 3
Views: 2740
Reputation: 2928
There is currently no standard library support to efficiently wait for the earliest readiness of multiple std::future
instances. However, there are multiple ways you can implement such functionality manually without having to resort to inefficient polling in a loop.
The ideal solution is to refactor your concurrency code to include alternative ways of signalling result readiness in addition to the usual std::future
so that you can have a shared signal endpoint that can be notified by multiple threads. I will share a C++20 approach using std::atomic
(and optionally std::counting_semaphore
) and a C++11 approach using std::call_once
.
If you cannot refactor the code that provides the std::future
instances, both of these approaches can also still be used. I'll discuss that further down in this answer, as well as provide example code for that situation.
In addition to each thread signaling its result through separate std::future
instances, you can also have the threads all try to store into the same std::atomic
instance upon completion, which the waiting thread can wait
on and then load from to determine which std::future
to check. This allows you to pick from two different possible behaviors:
compare_exchange_strong
with the expected value being the not-yet-ready sentinel value. When a losing thread tries to store the value, the compare_exchange_strong
call will rightfully fail, and it can surmise it lost the race.Regardless of which of these two approaches you use, you need to use notify_one
or notify_all
on the atomic after updating its value so that the original thread can be awoken from its wait
call.
As for what template argument to use for std::atomic
and what sentinel value to use, you actually have options there as well:
std::atomic<std::future<T>*>
with a sentinel of nullptr
allows you to directly get the pointer of the std::future
instance that should be examined. This can be convenient when all the std::future
instances are of the same type. You can also instead use a pointer to some wrapping structure that contains the std::future
and associated materials. In fact, you don't even have to use std::future
at all.std::atomic_size_t
with a sentinel of std::string::npos
allows you to treat the result as an array index. This can be useful when you need to check on other materials associated with the winning thread when you're using parallel arrays.std::atomic<SomeEnum>
with a sentinel being one of the enumerations allows you to switch
on the value to determine what to do. This can be useful when each std::future
is of a different type and/or you know how many there will be at compile time.Note however that wait
on a std::atomic
does not support providing a timeout - that functionality is coming in a future C++ standard. If you need to wait with a timeout, you can also throw in a std::counting_semaphore
which is initialized to zero and release
d by each thread, so that the waiting thread can use try_acquire_for
. Then it can load from the std::atomic
to find out which std::future
to examine. If you are using the compare_exchange_strong
approach, you can get away with a std::binary_semaphore
instead, since you can be assured only one thread will discover itself to be the victor.
Additionally, exercise caution with exceptions: if all of the threads fail by exception before signaling a result, you end up in an endless wait situation. One fix for this is to have a std::atomic
counting the number of failed threads, and when it reaches the total thread count, take some appropriate action to allow the application to progress and handle the error state. One way to handle this is for that last thread to signal victory as normal and let the waiting thread receive its exception.
If you want to support cancelling the losing threads, that can be trivially done with use of std::jthread
and std::stop_token
. Just make sure the std::jthread
destructors run before any of the other object destructors, and note that the std::jthread
destructor (or the std::future
destructor from std::async
) will block until the losing thread gets to a stopping point and finishes calling all its own destructors. You might consider moving cleanup duties to a background thread.
If you're using a threadpool, you can directly use std::stop_source
and std::stop_token
without std::jthread
. However, exercise caution: a race condition exists where a losing thread may have stored a value in its std::promise
connected to your std::future
, but not yet signaled the std::atomic
or the optional semaphore. Therefore, calling wait
on the std::future
is not sufficient to delay destruction of the std::atomic
and optional semaphore. You will need additional synchronization since you cannot use set_value_at_thread_exit
in a threadpool. An easy option is to just host all the shared state in a std::shared_ptr
so it automatically lives as long as necessary.
If you want to avoid the dynamic memory allocation associated with std::stop_source
/std::stop_token
, you can cheat by checking if the std::atomic
no longer contains the sentinel value, or by using std::counting_semaphore::try_acquire
followed by release
if successful, since both of these are indications that another thread already won. In the case of std::counting_semaphore
this also means you can wait on cancellation with a timeout, but this isn't particularly useful in most cases.
Each thread should receive a reference to a single std::once_flag
and callback, both of which are shared by all the threads. Use of std::future
is optional in this case, since the result can just be passed directly to the callback, which can itself use std::promise
/std::future
if desired. When a thread completes and is ready to transmit its result, it can then use std::call_once
, which handles the race synchronization automatically and ensures only the first thread gets to invoke the callback. The callback can do whatever it wants, either utilizing the value immediately or forwarding it to a std::promise
connected to a single std::future
that your original thread can trivially wait on.
Note however that the losing threads will not be able to terminate until the winning thread's callback returns. It is also possible for the callback to complete before the losing threads attempt to call it, but in that case only the std::once_flag
needs to outlive the losing threads, since callback cannot be invoked. Still, you should try to ensure the callback outlives the losing threads as well.
Additionally, exercise caution with exceptions: if the callback exits via exception, the std::once_flag
will be unset and unlocked for a losing thread to try invoking the callback again, which may have unintended consequences. Worse, if all of the threads fail by exception before invoking the callback, you end up in an endless wait situation. One fix for this is to have a std::atomic
counting the number of failed threads, and when it reaches the total thread count, take some appropriate action to allow the application to progress and handle the error state.
If you want to support cancelling the losing threads, a quick and dirty way is to use a single std::atomic_bool
that all the threads can read from to check for cancellation, and the waiting thread can set its value to inform them of that. Remember you must explicitly call .join()
on each std::thread
instance (or the std::future
returned from std::async
might do that implicitly), which will block until the losing thread gets to a stopping point and finishes calling all its own destructors. You might consider moving cleanup duties to a background thread.
If you're using a threadpool, then you will need additional synchronization since the std::once_flag
and callback need to live long enough for all the losing threads to be done with them. An easy option is to just host all the shared state in a std::shared_ptr
so it automatically lives as long as necessary.
If you're using a threadpool, remember that you might need to do something special to get it to create additional threads instead of just holding your work in a queue while earlier work executes. Forgetting this can result in one of the would-be-winners getting stuck in the queue and not executing while the slower jobs end up "winning" the race due to getting to go first. Check your threadpool library's documentation for information on how to do this. Sometimes you do this by informing it that the work can take a long time to run, other times you can indicate you really want all the work to start in parallel without being queued. In some cases it may be better to just create dedicated threads of your own so as not to upset the balance of a threadpool.
If you are working with an existing library or API that just gives you std::future
instances with no way to add any form of callback to be executed on the threads as they finish, you will have to create your own threads and utilize one of the above approaches, with the "work" being to simply wait on their designated std::future
instances from that other library. Doing so is wasteful in terms of memory usage and thread count (since every thread the library makes is mirrored by one of your own threads), but you may find it necessary for optimal performance or power efficiency compared to polling.
Aditya Hegde's answer is an example implementation of this concept that works in C++11, though it may be less optimal than my proposed C++11 approach on some platforms.
Because the "work" in this case is to just call wait
on that other library's std::future
instances, you might be able to get away with using my suggested approaches without worrying about the exception handling edge cases I pointed out previously, depending on the exception promises of wait
in the standard library implementation you're using. You could therefore utilize my suggested approaches without needing to create any extra std::promise
/std::future
instances, since you'd just be working with the ones from that other library.
Here is an example C++20 program that solves your original problem:
#include <atomic>
#include <future>
#include <thread>
#include <vector>
namespace some_library
{
class BackgroundProcessor
{
std::vector<std::jthread> threads;
public:
std::future<int> process(int param)
{
std::packaged_task<int (int)> task([](int param)
-> int
{
std::this_thread::sleep_for(std::chrono::seconds(1) + std::chrono::milliseconds(param));
return param;
});
std::future<int> f(task.get_future());
threads.emplace_back(std::move(task), param);
return f;
}
};
}
namespace your_code
{
class BackgroundProcessorHelper
{
static constexpr std::size_t RACE_IN_PROGRESS{(std::size_t)-1};
std::atomic_size_t victor{RACE_IN_PROGRESS};
some_library::BackgroundProcessor processor;
struct Waiter
{
std::future<int> f;
std::jthread thread{};
};
std::vector<Waiter> waiters;
public:
int waitFirstOfThree(int paramA, int paramB, int paramC)
{
waiters.emplace_back(processor.process(paramA));
waiters.emplace_back(processor.process(paramB));
waiters.emplace_back(processor.process(paramC));
for(std::size_t i{0}; i < std::size(waiters); ++i)
{
Waiter& waiter{waiters.at(i)};
waiter.thread = std::jthread([&f = waiter.f, i, &victor = victor]
{
f.wait();
std::size_t expected{RACE_IN_PROGRESS};
if(victor.compare_exchange_strong(expected, i))
{
victor.notify_all();
}
});
}
victor.wait(RACE_IN_PROGRESS);
std::size_t const victorIndex{victor.load()};
return waiters.at(victorIndex).f.get();
}
};
}
int main()
{
your_code::BackgroundProcessorHelper helper;
return helper.waitFirstOfThree(65, 21, 43);
}
Demo: https://compiler-explorer.com/z/6a7cY3PGr
wait
Unfortunately, everything discussed so far in this answer is just a workaround for a lack of a proper way to wait on the first of multiple events in C++. Therefore, there is currently no efficient way to simultaneously wait on another library's std::future
and your own cancellation source. You will have to use wait_for
in a loop to wait with a timeout based on how responsive you want cancellation to be, at the cost of power efficiency. If you're lucky, the library you're using may have its own cancellation mechanism, saving you from this hassle. If you want a real proper solution though, you'll have to do it yourself, either by modifying the library you are using or dropping it and rolling your own.
When you have full control over the entire program, you can use tricks like the C++20 std::atomic
functionality to wait for the value to change and to notify waiters. Therefore, you could have a std::atomic
be used to both signal a successful wait or a cancelled wait, and its value can allow you to act accordingly. However, this means you can't use std::future
at all, you'll have to re-invent it yourself. You also can't wait with a timeout until a later C++ standard, unless you re-invent that too by having another thread sleep and cancel the wait, but that's difficult to do correctly. I'd advise against designs that require wait timeouts anyway.
If you can't use C++20, some operating systems have built-in support for waiting for the first victor among multiple threads or future-like things in optimal ways, such as Windows' WaitForMultipleObjects
family of APIs. Using operating system APIs might be slower than the C++20 approach on some architectures depending on your standard library implementation and target use cases, but it will allow you to correctly wait for both a desired event and a cancellation event in an optimal way. For Windows, that typically means creating event objects that can be signaled by the threads and waited upon by the WaitForMultipleObjects
family of APIs. Since you get information about which event caused the wait to end, you can respond in the correct way, and you don't need to use any timeouts if you don't want to. Be wary however, these Windows APIs have a rather low limit on the number of things that can be waited upon in a single call, so you might need to break up the list into separate chunks that are sent to separate threads to wait on, and then wait on those results recursively.
Really though, you should probably just ask the library authors to implement the ability to cancel the operations, so you can use the original example code from earlier in this answer and save yourself a lot of trouble.
Upvotes: 2
Reputation: 61
I've tried to come up with a function that takes two futures f1
and f2
and returns a new future which is resolved to either f1
or f2
depending on which completes first.
The function assumes that the get()
method is not called on f1
and f2
anywhere (else we'll need to use std::shared_future
).
#include <unistd.h>
#include <array>
#include <future>
#include <iostream>
#include <thread>
template <typename T>
class WaitFirstData {
public:
T res;
bool res_ready;
std::condition_variable res_cv;
std::mutex res_mutex;
WaitFirstData() : res_ready{false} {}
};
template <typename T>
std::future<T> wait_first(std::future<T> f1, std::future<T> f2) {
auto data = std::make_shared<WaitFirstData<T>>();
std::promise<T> wait_first_promise;
std::future<T> wait_first_future = wait_first_promise.get_future();
std::thread wait_first_thread{
[](std::promise<T> p, std::shared_ptr<WaitFirstData<T>> data) {
std::unique_lock<std::mutex> lk(data->res_mutex);
data->res_cv.wait(lk, [&]() { return data->res_ready; });
p.set_value(data->res);
},
std::move(wait_first_promise), data};
auto thread_lambda = [](std::future<T> f,
std::shared_ptr<WaitFirstData<T>> data) {
T r = f.get();
std::lock_guard<std::mutex> lk(data->res_mutex);
if (!data->res_ready) {
data->res_ready = true;
data->res = r;
data->res_cv.notify_one();
}
};
std::thread t1{thread_lambda, std::move(f1), data},
t2{thread_lambda, std::move(f2), data};
wait_first_thread.detach();
t1.detach();
t2.detach();
return wait_first_future;
}
// arbitrary function denoting a potentially long-running task
int func(int n) {
sleep(n);
return n;
}
int main() {
auto f1 = std::async(std::launch::async, func, 2);
auto f2 = std::async(std::launch::async, func, 15);
// f3 is resolved as soon as either f1 or f2 completes (with the same return
// value too)
auto f3 = wait_first(std::move(f1), std::move(f2));
// wait for f3 to resolve
int l = f3.get();
// computation (here I/O) done after the first among f1 and f2 is completed
std::cout << "Task done after " << l << " seconds" << std::endl;
return 0;
}
Upvotes: 0
Reputation: 13634
You can use std::experimental::future
method then
if your computation is sequential (one depends on other), if you have access to std::experimental::future
(and not afraid of using "experimental")
You can also pass one future as parameter to second computation right away. future::get
is a way to wait for future to complete. Use std::shared_future
, if you need to pass the first future to somewhere else.
Upvotes: 1