Kevin
Kevin

Reputation: 3348

convert `impl Stream<Item = Future<Output = IoResult<impl A>>>` to `impl Stream<Item = impl A>`

I have the following setup:

use core::task::Poll;
use tokio::io::ReadBuf;
use core::task::Context;
use core::pin::Pin;
use std::error::Error;
use tokio::io::AsyncRead;

struct Dummy;

impl AsyncRead for Dummy {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<tokio::io::Result<()>> {
        Poll::Pending
    }
}

fn request_peers() -> impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>> {
   futures::stream::iter((0..10).map(move |i| {
        futures::future::ok(Dummy{})
    }))
}

async fn connect (
    peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> impl futures::stream::Stream<Item = impl tokio::io::AsyncRead> {
    todo!()
}

#[tokio::main]
async fn main() {
    let peers = request_peers();
    let connected_peers = connect(peers).await;
}

playground link

I want to connect all peers by awaiting a future and ignore the peers which do not connect. Ideally, I would want to keep the peers in a future::stream::Stream. I thought that the following might work:

use core::task::Poll;
use tokio::io::ReadBuf;
use core::task::Context;
use core::pin::Pin;
use std::error::Error;
use tokio::io::AsyncRead;

struct Dummy;

impl AsyncRead for Dummy {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<tokio::io::Result<()>> {
        Poll::Pending
    }
}

fn request_peers() -> impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>> {
   futures::stream::iter((0..10).map(move |i| {
        println!("instantiated");
        futures::future::ok(Dummy{})
    }))
}

use futures::{StreamExt};

fn connect (
    peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> impl futures::stream::Stream<Item = impl tokio::io::AsyncRead> {
    peers.filter_map(|peer_fut| async move {
        if let Ok(peer) = peer_fut.await {
            tokio::time::sleep(core::time::Duration::from_secs(1)).await;
            println!("connected");
            Some(peer)
        } else {
            None
        }
    })
}

#[tokio::main]
async fn main() {
    let peers = request_peers();
    let connected_peers = connect(peers);
    
    connected_peers.for_each_concurrent(None, |peer| async {
        println!("processed")
    }).await;
}

playground link

But the peers are not connected concurrently, so this will take 10 seconds to finish - instead of ~1 sec.

I notice if I return a Vec instead of future::stream::Stream it will connect the peers concurrently with the following code snippet:

use futures::{StreamExt};

async fn connect (
    peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> Vec<impl tokio::io::AsyncRead> {
    let mut peers = peers.map(|peer_fut| async move {
        if let Ok(peer) = peer_fut.await {
            tokio::time::sleep(core::time::Duration::from_secs(1)).await;
            println!("connected");
            Some(peer)
        } else {
            None
        }
    })
    .buffer_unordered(50)
    .collect::<Vec<_>>().await;
    
    peers.into_iter().flatten().collect()
}

#[tokio::main]
async fn main() {
    let peers = request_peers();
    let connected_peers = connect(peers).await;
    
    futures::stream::iter(connected_peers).for_each_concurrent(None, |peer| async {
        println!("processed")
    }).await;
}

playground link

Is there a way to do this without converting to Vec and instead keeping the futures::stream::Stream ?

Upvotes: 0

Views: 146

Answers (1)

Marcus Ilgner
Marcus Ilgner

Reputation: 7211

This sounds like a good use case for FuturesUnordered

You create a number of futures (I.e. by running map and collect on a Vec), then convert them into an iterator which asynchronously yields results from whatever future completes first. If any futures return an error result, it could be skipped or handled appropriately.

Upvotes: 1

Related Questions