Reputation: 1771
I have a stream which uses combinators and I need to run it to completion. I can use a while
loop or the for_each
combinator. They will both work, but I think there must be a nicer way.
Sink
looks like what I'm looking for, especially sink::drain()
, but I havn't been able to understand how to use it.
Using while loop
use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0
#[tokio::main]
async fn main() {
let mut stream = Box::pin(
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok())),
);
while let Some(_) = stream.next().await {
// Nothing to do here. I just need to run stream.
}
}
fn foo(x: i32) -> Result<i32, String> {
if x != 10 {
Ok(x)
} else {
Err("eeer".to_string())
}
}
async fn bar(x: i32) -> Result<(), String> {
async {
if x == 13 {
Err("errr".to_string())
} else {
Ok(())
}
}
.await
}
Using for_each
:
use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0
#[tokio::main]
async fn main() {
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok()))
.for_each(|_| futures::future::ready(())) // Nothing to do here, just to run stream
.await;
}
fn foo(x: i32) -> Result<i32, String> {
if x != 10 {
Ok(x)
} else {
Err("eeer".to_string())
}
}
async fn bar(x: i32) -> Result<(), String> {
async {
if x == 13 {
Err("errr".to_string())
} else {
Ok(())
}
}
.await
}
I would like to have something like the following. It's not necessary to use the drain
combinator exactly, just some combinator to run the stream:
use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0
#[tokio::main]
async fn main() {
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok()))
.forward(futures::sink::drain())
.await;
}
fn foo(x: i32) -> Result<i32, String> {
if x != 10 {
Ok(x)
} else {
Err("eeer".to_string())
}
}
async fn bar(x: i32) -> Result<(), String> {
async {
if x == 13 {
Err("errr".to_string())
} else {
Ok(())
}
}
.await
}
This doesn't work, probably because drain puts some bounds on the Error
type:
error[E0271]: type mismatch resolving `<futures::sink::Drain<()> as futures::Sink<()>>::Error == std::string::String`
--> src/main.rs:11:10
|
11 | .forward(futures::sink::drain())
| ^^^^^^^ expected enum `std::convert::Infallible`, found struct `std::string::String`
error[E0271]: type mismatch resolving `<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]> as futures::Stream>::Item == std::result::Result<(), std::convert::Infallible>`
--> src/main.rs:6:5
|
6 | / futures::stream::iter(0..20)
7 | | .map(foo)
8 | | .map_ok(|x| x * 10)
9 | | .and_then(bar)
10 | | .filter(|x| futures::future::ready(x.is_ok()))
11 | | .forward(futures::sink::drain())
12 | | .await;
| |______________^ expected struct `std::string::String`, found enum `std::convert::Infallible`
|
= note: expected enum `std::result::Result<_, std::string::String>`
found enum `std::result::Result<_, std::convert::Infallible>`
= note: required because of the requirements on the impl of `futures::Future` for `futures_util::stream::stream::forward::Forward<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]>, futures::sink::Drain<()>, ()>`
Upvotes: 0
Views: 2265
Reputation: 1537
You can use collect::<()>()
to run the stream to completion. Example:
use futures::StreamExt;
#[tokio::main]
async fn main() {
futures::stream::iter(0..20)
.map(|i| async move {
// Do something here
println!("{}", i);
})
.buffer_unordered(4)
.collect::<()>()
.await;
}
Although collect::<()>()
has the word collect, it does not collect anything or build any data structure. It just loop over the stream and execute to completion.
One thing to note, to use collect::<()>()
the Item
of your stream must be ()
. In other words, you must handle both result and error before using this method. I think this make perfect sense.
Upvotes: 2
Reputation: 98328
Sink
trait is fallible (there is no TrySink
) but drain()
returns a Drain
whose Error
is Infallible
.
And Stream::forward()
requires the stream to be fallible (actually TryStream
) and have the same error type as the given sink. Your code fails because your error type is String
, and that cannot be drained.
The solution, since you are filtering the is_ok
results, it to unwrap and rewrap the values:
#[tokio::main]
async fn main() {
futures::stream::iter(0..20)
.map(foo)
.map_ok(|x| x * 10)
.and_then(bar)
.filter(|x| futures::future::ready(x.is_ok()))
.map(|x| Ok(x.unwrap())) // <---- rewrap!
.forward(futures::sink::drain())
.await.unwrap();
}
I feel that there should be an easier way to build a Result<_, Infallible>
, but I don't know how. You could write map_err(|_| panic!())
but that is hardly better.
Upvotes: 1