Reputation: 3348
I have the following setup:
use core::task::Poll;
use tokio::io::ReadBuf;
use core::task::Context;
use core::pin::Pin;
use std::error::Error;
use tokio::io::AsyncRead;
struct Dummy;
impl AsyncRead for Dummy {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
Poll::Pending
}
}
fn request_peers() -> impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>> {
futures::stream::iter((0..10).map(move |i| {
futures::future::ok(Dummy{})
}))
}
async fn connect (
peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> impl futures::stream::Stream<Item = impl tokio::io::AsyncRead> {
todo!()
}
#[tokio::main]
async fn main() {
let peers = request_peers();
let connected_peers = connect(peers).await;
}
I want to connect all peers
by awaiting a future and ignore the peers which do not connect. Ideally, I would want to keep the peers in a future::stream::Stream
. I thought that the following might work:
use core::task::Poll;
use tokio::io::ReadBuf;
use core::task::Context;
use core::pin::Pin;
use std::error::Error;
use tokio::io::AsyncRead;
struct Dummy;
impl AsyncRead for Dummy {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
Poll::Pending
}
}
fn request_peers() -> impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>> {
futures::stream::iter((0..10).map(move |i| {
println!("instantiated");
futures::future::ok(Dummy{})
}))
}
use futures::{StreamExt};
fn connect (
peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> impl futures::stream::Stream<Item = impl tokio::io::AsyncRead> {
peers.filter_map(|peer_fut| async move {
if let Ok(peer) = peer_fut.await {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
println!("connected");
Some(peer)
} else {
None
}
})
}
#[tokio::main]
async fn main() {
let peers = request_peers();
let connected_peers = connect(peers);
connected_peers.for_each_concurrent(None, |peer| async {
println!("processed")
}).await;
}
But the peers are not connected concurrently, so this will take 10 seconds to finish - instead of ~1 sec.
I notice if I return a Vec
instead of future::stream::Stream
it will connect the peers concurrently with the following code snippet:
use futures::{StreamExt};
async fn connect (
peers: impl futures::stream::Stream<Item = impl futures::Future<Output = tokio::io::Result<impl tokio::io::AsyncRead>>>
) -> Vec<impl tokio::io::AsyncRead> {
let mut peers = peers.map(|peer_fut| async move {
if let Ok(peer) = peer_fut.await {
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
println!("connected");
Some(peer)
} else {
None
}
})
.buffer_unordered(50)
.collect::<Vec<_>>().await;
peers.into_iter().flatten().collect()
}
#[tokio::main]
async fn main() {
let peers = request_peers();
let connected_peers = connect(peers).await;
futures::stream::iter(connected_peers).for_each_concurrent(None, |peer| async {
println!("processed")
}).await;
}
Is there a way to do this without converting to Vec
and instead keeping the futures::stream::Stream
?
Upvotes: 0
Views: 146
Reputation: 7211
This sounds like a good use case for FuturesUnordered
You create a number of futures (I.e. by running map and collect on a Vec), then convert them into an iterator which asynchronously yields results from whatever future completes first. If any futures return an error result, it could be skipped or handled appropriately.
Upvotes: 1