Reputation: 800
I want to implement a Stream
that is based on a FuturesUnordered
, which again is supposed to evaluate async
functions with a return type of Result<SomeEnum>
, though for simplicity of the argument let's assume it's only a Result<f64>
. As async fn
s eventually return Future
s, I assumed that the following way would be how I have to define my struct:
use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;
#[pin_project]
pub struct MyDerivedStream<'a> {
#[pin]
from_futures: FuturesUnordered<&'a (dyn Future<Output = Result<f64>> + Send)>,
}
impl Stream for MyDerivedStream<'_> {
type Item = Result<f64>;
fn poll_next(
self: Pin<&mut Self>,
c: &mut std::task::Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
let this = self.project();
this.from_futures.poll_next(c)
}
}
The problem I'm running into now is that for some reason the poll_next
function on the FuturesUnordered
fails to compile due to not satisfying Stream
trait bounds. (See for yourself on this Playground example):
error[E0599]: the method `poll_next` exists for struct `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>`, but its trait bounds were not satisfied
--> src/lib.rs:21:27
|
21 | this.from_futures.poll_next(c)
| ^^^^^^^^^ method cannot be called on `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>` due to unsatisfied trait bounds
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.14/src/stream/futures_unordered/mod.rs:55:1
|
55 | pub struct FuturesUnordered<Fut> {
| -------------------------------- doesn't satisfy `_: futures::Stream`
|
= note: the following trait bounds were not satisfied:
`&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send: futures::Future`
which is required by `FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>: futures::Stream`
I'm struggling to understand the problem here. For all I can see FuturesUnordered
does implement Stream
, so what is the actual issue here? Is is the &'a dyn Future
- and if so, how else would I need to define the type here to make this work?
Upvotes: 2
Views: 1232
Reputation: 2618
&'a dyn Future
does not implement Future
, which is required by impl Stream for FuturesUnordered
. One solution would be replacing &'a dyn Future
by Pin<&mut 'a dyn Future>
:
use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;
#[pin_project]
pub struct MyDerivedStream<'a> {
#[pin]
from_futures: FuturesUnordered<Pin<&'a mut(dyn Future<Output = Result<f64>> + Send)>>,
}
impl<'a> Stream for MyDerivedStream<'a> {
type Item = Result<f64>;
fn poll_next(
self: Pin<&mut Self>,
c: &mut std::task::Context<'_>,
) -> Poll<Option<<Self as Stream>::Item>> {
let this = self.project().from_futures;
this.poll_next(c)
}
}
It's necessary to mutably borrow the items in FuturesUnordered
, this becomse evident by checking out the Futures::poll
function which takes self: Pin<&mut Self>
. The implementation of Stream for FuturesUnordered
needs to poll the wrapped items in order to determine when a new item can be yielded, which is not possible with shared references.
Without the Pin
around the &mut Future
, it would be possible to mem::replace
the wrapped future and cause that Future
to never be actually polled.
This is a great resource to learn more about Pinning: https://fasterthanli.me/articles/pin-and-suffering
Upvotes: 3