Finlay Weber
Finlay Weber

Reputation: 4163

How to race collection of futures in Rust and with retry

I have a collection of Futures, and I would like to execute all of them and get the first one that resolves successfully and abort the others still processing.

But I want to take care of the scenario where the first future that resolves actually returns an invalid value, hence leading to a situation where a retry is needed.

I found the select! macro from tokio, but it does not supporting racing a collection of futures. With select! one needs to explicitly list the futures that would be raced...making it not usable for my usecase. Also i do not see it supporting any retry mechanism.

So how do I race collection of futures in Rust and with retry?

Upvotes: 0

Views: 421

Answers (1)

Chayim Friedman
Chayim Friedman

Reputation: 71430

If your futures return Result and you need to retry on Err, and by "retry" you don't mean to retry the failed future but to try others, you can use futures' select_ok():

async fn first_successful<T, E: fmt::Debug, Fut>(futures: Vec<Fut>) -> T
where
    E: fmt::Debug,
    Fut: Future<Output = Result<T, E>> + Unpin,
{
    let (output, _remaining_futures) = futures::future::select_ok(futures)
        .await
        .expect("all futures failed");
    output
}

If not, and you need more control, you can use the powerful FuturesUnordered. For example, for trying others with a custom predicate:

use futures::stream::StreamExt;

async fn first_successful<Fut: Future + Unpin>(
    futures: Vec<Fut>,
    mut failed: impl FnMut(&Fut::Output) -> bool,
) -> Fut::Output {
    let mut futures = futures::stream::FuturesUnordered::from_iter(futures);
    while let Some(v) = futures.next().await {
        if !failed(&v) {
            return v;
        }
    }
    panic!("all futures failed");
}

Upvotes: 3

Related Questions