Artem Malinko
Artem Malinko

Reputation: 1771

How to run stream to completion in Rust using combinators (other than for_each) and without a while loop?

I have a stream which uses combinators and I need to run it to completion. I can use a while loop or the for_each combinator. They will both work, but I think there must be a nicer way.

Sink looks like what I'm looking for, especially sink::drain(), but I havn't been able to understand how to use it.

Using while loop

use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0

#[tokio::main]
async fn main() {
    let mut stream = Box::pin(
        futures::stream::iter(0..20)
            .map(foo)
            .map_ok(|x| x * 10)
            .and_then(bar)
            .filter(|x| futures::future::ready(x.is_ok())),
    );

    while let Some(_) = stream.next().await {
        // Nothing to do here. I just need to run stream.
    }
}

fn foo(x: i32) -> Result<i32, String> {
    if x != 10 {
        Ok(x)
    } else {
        Err("eeer".to_string())
    }
}

async fn bar(x: i32) -> Result<(), String> {
    async {
        if x == 13 {
            Err("errr".to_string())
        } else {
            Ok(())
        }
    }
    .await
}

Using for_each:

use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0

#[tokio::main]
async fn main() {
    futures::stream::iter(0..20)
        .map(foo)
        .map_ok(|x| x * 10)
        .and_then(bar)
        .filter(|x| futures::future::ready(x.is_ok()))
        .for_each(|_| futures::future::ready(())) // Nothing to do here, just to run stream
        .await;
}

fn foo(x: i32) -> Result<i32, String> {
    if x != 10 {
        Ok(x)
    } else {
        Err("eeer".to_string())
    }
}

async fn bar(x: i32) -> Result<(), String> {
    async {
        if x == 13 {
            Err("errr".to_string())
        } else {
            Ok(())
        }
    }
    .await
}

I would like to have something like the following. It's not necessary to use the drain combinator exactly, just some combinator to run the stream:

use futures::{StreamExt, TryStreamExt}; // 0.3.6
use tokio; // 0.3.0

#[tokio::main]
async fn main() {
    futures::stream::iter(0..20)
        .map(foo)
        .map_ok(|x| x * 10)
        .and_then(bar)
        .filter(|x| futures::future::ready(x.is_ok()))
        .forward(futures::sink::drain())
        .await;
}

fn foo(x: i32) -> Result<i32, String> {
    if x != 10 {
        Ok(x)
    } else {
        Err("eeer".to_string())
    }
}

async fn bar(x: i32) -> Result<(), String> {
    async {
        if x == 13 {
            Err("errr".to_string())
        } else {
            Ok(())
        }
    }
    .await
}

This doesn't work, probably because drain puts some bounds on the Error type:

error[E0271]: type mismatch resolving `<futures::sink::Drain<()> as futures::Sink<()>>::Error == std::string::String`
  --> src/main.rs:11:10
   |
11 |         .forward(futures::sink::drain())
   |          ^^^^^^^ expected enum `std::convert::Infallible`, found struct `std::string::String`

error[E0271]: type mismatch resolving `<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]> as futures::Stream>::Item == std::result::Result<(), std::convert::Infallible>`
  --> src/main.rs:6:5
   |
6  | /     futures::stream::iter(0..20)
7  | |         .map(foo)
8  | |         .map_ok(|x| x * 10)
9  | |         .and_then(bar)
10 | |         .filter(|x| futures::future::ready(x.is_ok()))
11 | |         .forward(futures::sink::drain())
12 | |         .await;
   | |______________^ expected struct `std::string::String`, found enum `std::convert::Infallible`
   |
   = note: expected enum `std::result::Result<_, std::string::String>`
              found enum `std::result::Result<_, std::convert::Infallible>`
   = note: required because of the requirements on the impl of `futures::Future` for `futures_util::stream::stream::forward::Forward<futures::stream::Filter<futures::stream::AndThen<futures::stream::MapOk<futures::stream::Map<futures::stream::Iter<std::ops::Range<i32>>, fn(i32) -> std::result::Result<i32, std::string::String> {foo}>, [closure@src/main.rs:8:17: 8:27]>, impl futures::Future, fn(i32) -> impl futures::Future {bar}>, futures::future::Ready<bool>, [closure@src/main.rs:10:17: 10:54]>, futures::sink::Drain<()>, ()>`

Upvotes: 0

Views: 2265

Answers (2)

mibu
mibu

Reputation: 1537

You can use collect::<()>() to run the stream to completion. Example:

use futures::StreamExt;

#[tokio::main]
async fn main() {
    futures::stream::iter(0..20)
        .map(|i| async move {
            // Do something here
            println!("{}", i);
        })
        .buffer_unordered(4)
        .collect::<()>()
        .await;
}

Although collect::<()>() has the word collect, it does not collect anything or build any data structure. It just loop over the stream and execute to completion.

One thing to note, to use collect::<()>() the Item of your stream must be (). In other words, you must handle both result and error before using this method. I think this make perfect sense.

Upvotes: 2

rodrigo
rodrigo

Reputation: 98328

Sink trait is fallible (there is no TrySink) but drain() returns a Drain whose Error is Infallible.

And Stream::forward() requires the stream to be fallible (actually TryStream) and have the same error type as the given sink. Your code fails because your error type is String, and that cannot be drained.

The solution, since you are filtering the is_ok results, it to unwrap and rewrap the values:

#[tokio::main]
async fn main() {
    futures::stream::iter(0..20)
        .map(foo)
        .map_ok(|x| x * 10)
        .and_then(bar)
        .filter(|x| futures::future::ready(x.is_ok()))
        .map(|x| Ok(x.unwrap())) // <---- rewrap!
        .forward(futures::sink::drain())
        .await.unwrap();
}

I feel that there should be an easier way to build a Result<_, Infallible>, but I don't know how. You could write map_err(|_| panic!()) but that is hardly better.

Upvotes: 1

Related Questions