Reputation: 1096
In Rust, I have a bunch of async functions that I want to execute in parallel. The order in which the results of these functions are handled is important. I also want to retrieve the results of these functions as they become available.
Let me explain poorly.
Here is the description of FuturesOrdered
:
This "combinator" is similar to FuturesUnordered, but it imposes an order on top of the set of futures. While futures in the set will race to completion in parallel, results will only be returned in the order their originating futures were added to the queue.
So far so good. Now look at this example:
let mut ft = FuturesOrdered::new();
ft.push(wait_n(1)); // wait_n sleeps
ft.push(wait_n(2)); // for the given
ft.push(wait_n(4)); // number of secs
ft.push(wait_n(3));
ft.push(wait_n(5));
let r = ft.collect::<Vec<u64>>().await;
Since FuturesOrdered
awaits until all futures are completed; this is what I get:
|--| ++
|----| ++
|--------| ++
|------| ++
|----------|++
++-> all results available here
This is what I want:
|--|++
|----|++
|--------|++
|------| ++
|----------| ++
In other words; I want to await on the next future; as the remaining futures keep racing to completion. Also note that even though task #4 was completed before task #3; it was handled after #3 because of the initial order.
How can I get a stream of futures that are executed concurrently like this? I was hoping for something like this:
let mut ft = MagicalStreamOfOrderedFutures::new();
ft.push(wait_n(1));
ft.push(wait_n(2));
ft.push(wait_n(4));
ft.push(wait_n(3));
ft.push(wait_n(5));
while Some(result) = ft.next().await {
// returns results in order at seconds 1,2,4,4,5
}
Upvotes: 0
Views: 1977
Reputation: 42272
Since FuturesOrdered awaits until all futures are completed
It does not inherently do that.
You're asking it to because you're collect
-ing to a Vec
. Since the entire point of StreamExt::collect
is to convert the entire stream into a collection:
Transforms a stream into a collection, returning a future representing the result of that computation. The returned future will be resolved when the stream terminates.
It can only yield the collection once all the futures have settled.
If you access the stream lazily, it'll yield items as they become available:
let mut s = stream::FuturesOrdered::new();
s.push(future::lazy(|_| 1).boxed());
s.push(future::lazy(|_| panic!("never resolves")).boxed());
let f = s.next().await;
println!("{:?}", f);
works just fine, despite it not being possible for the second future to resolve. If you try to collect
it, it'll panic.
How can I get a stream of futures that are executed concurrently like this? I was hoping for something like this:
let mut s = stream::FuturesOrdered::new();
s.push(sleep(Duration::from_millis(100)));
s.push(sleep(Duration::from_millis(200)));
s.push(sleep(Duration::from_millis(400)));
s.push(sleep(Duration::from_millis(300)));
s.push(sleep(Duration::from_millis(500)));
let start = Instant::now();
while s.next().await.is_some() {
println!("{:.2?}", Instant::now() - start);
}
101.49ms
200.98ms
400.94ms
400.96ms
501.40ms
(using millisecond sleeps because multi-second sleep tends to trip the playground's timeout)
Upvotes: 9