Luigi Sgro
Luigi Sgro

Reputation: 649

How to poll an asynchronous stream and futures generated from stream items in the same loop?

I am trying to consume messages coming from a stream, while at the same time processing async responses from REST service calls.

I have exemplified the structure of the program with a mock stream and futures:

use tokio::runtime;

use futures::future;
use futures::stream::{self, StreamExt};

async fn demo() {
    let mut s = stream::repeat::<u32>(9);

    let mut counter = 0;
    while let Some(n) = s.next().await {
        counter += 1;
        if counter % 100 == 0 {
            // simulate a condition that requires sending a msg
            let f = future::ready::<u32>(10);
            // How to poll future 'f' in the same loop as stream 's'?
        }
    }
}

fn main() {
    let runtime = runtime::Builder::new_current_thread()
        .enable_io()
        .build()
        .unwrap();
    runtime.block_on(async move { demo().await });
}

How can I poll both the next stream item, or the future output (when a future is available), whichever comes first, in the same loop?

This structure would make it possible for a trading strategy, which sends orders based on a stream of market data items, to listen to the order service results and the market data messages in the same function.

Upvotes: 3

Views: 2622

Answers (1)

Luigi Sgro
Luigi Sgro

Reputation: 649

By using the method try_flatten() it is possible to turn a stream of streams into a stream. The only catch is that this method is defined on trait: TryStreamExt which is a stream of Result items, and the inner streams must also be of that type.

What it implies is that instead of dealing with type u32 in my example we are dealing with type Result<u32, ()>.

To build a stream out of a future, the stream::once() function is used.

To make a stream of streams, as per the try_flatten() method documentation, a MPSC channel is used.

Here is the complete code of my solution. I have made use of type annotation to better understand what is going on.

use tokio::runtime;

use futures::future;
use futures::stream::{self, StreamExt, TryStreamExt};

use futures::channel::mpsc;

async fn demo() {
    let s: stream::Repeat<Result<u32, ()>> = stream::repeat::<Result<u32, ()>>(Ok(9));
    let (tx, rx): (
        mpsc::UnboundedSender<Result<stream::Once<future::Ready<Result<u32, ()>>>, ()>>,
        mpsc::UnboundedReceiver<Result<stream::Once<future::Ready<Result<u32, ()>>>, ()>>,
    ) = mpsc::unbounded();
    let mut merged = stream::select(s, rx.try_flatten());

    let mut counter: u32 = 0;
    while let Some(n) = merged.next().await {
        println!("{:?}", n);
        counter += 1;
        if counter % 100 == 0 {
            let f: stream::Once<future::Ready<Result<u32, ()>>> =
                stream::once(future::ready::<Result<u32, ()>>(Ok(10)));
            tx.unbounded_send(Ok(f)).unwrap();
        }
    }
}

fn main() {
    let runtime = runtime::Builder::new_current_thread()
        .enable_io()
        .build()
        .unwrap();
    runtime.block_on(async move { demo().await });
}

I don't think that using the Tokio runtime makes a difference here, but I might be wrong. Since I am still learning async Rust, any constructive comment or suggestion is very welcome.

Upvotes: 0

Related Questions