Sty
Sty

Reputation: 800

`FuturesUnordered` does not satisfy `Stream`?

I want to implement a Stream that is based on a FuturesUnordered, which again is supposed to evaluate async functions with a return type of Result<SomeEnum>, though for simplicity of the argument let's assume it's only a Result<f64>. As async fns eventually return Futures, I assumed that the following way would be how I have to define my struct:

use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;

#[pin_project]
pub struct MyDerivedStream<'a> {
    #[pin]
    from_futures: FuturesUnordered<&'a (dyn Future<Output = Result<f64>> + Send)>,
}

impl Stream for MyDerivedStream<'_> {
    type Item = Result<f64>;

    fn poll_next(
        self: Pin<&mut Self>,
        c: &mut std::task::Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        let this = self.project();

        this.from_futures.poll_next(c)
    }
}

The problem I'm running into now is that for some reason the poll_next function on the FuturesUnordered fails to compile due to not satisfying Stream trait bounds. (See for yourself on this Playground example):

error[E0599]: the method `poll_next` exists for struct `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>`, but its trait bounds were not satisfied
  --> src/lib.rs:21:27
   |
21 |         this.from_futures.poll_next(c)
   |                           ^^^^^^^^^ method cannot be called on `Pin<&mut FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>>` due to unsatisfied trait bounds
   | 
  ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.14/src/stream/futures_unordered/mod.rs:55:1
   |
55 | pub struct FuturesUnordered<Fut> {
   | -------------------------------- doesn't satisfy `_: futures::Stream`
   |
   = note: the following trait bounds were not satisfied:
           `&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send: futures::Future`
           which is required by `FuturesUnordered<&dyn futures::Future<Output = std::result::Result<f64, anyhow::Error>> + std::marker::Send>: futures::Stream`

I'm struggling to understand the problem here. For all I can see FuturesUnordered does implement Stream, so what is the actual issue here? Is is the &'a dyn Future - and if so, how else would I need to define the type here to make this work?

Upvotes: 2

Views: 1232

Answers (1)

sebpuetz
sebpuetz

Reputation: 2618

&'a dyn Future does not implement Future, which is required by impl Stream for FuturesUnordered. One solution would be replacing &'a dyn Future by Pin<&mut 'a dyn Future>:

use anyhow::Result;
use futures::{Future, Stream, stream::FuturesUnordered};
use std::{pin::Pin, task::Poll};
use pin_project::pin_project;

#[pin_project]
pub struct MyDerivedStream<'a> {
    #[pin]
    from_futures: FuturesUnordered<Pin<&'a mut(dyn Future<Output = Result<f64>> + Send)>>,
}

impl<'a> Stream for MyDerivedStream<'a> {
    type Item = Result<f64>;

    fn poll_next(
        self: Pin<&mut Self>,
        c: &mut std::task::Context<'_>,
    ) -> Poll<Option<<Self as Stream>::Item>> {
        let this = self.project().from_futures;

        this.poll_next(c)
    }
}

It's necessary to mutably borrow the items in FuturesUnordered, this becomse evident by checking out the Futures::poll function which takes self: Pin<&mut Self>. The implementation of Stream for FuturesUnordered needs to poll the wrapped items in order to determine when a new item can be yielded, which is not possible with shared references.

Without the Pin around the &mut Future, it would be possible to mem::replace the wrapped future and cause that Future to never be actually polled.

This is a great resource to learn more about Pinning: https://fasterthanli.me/articles/pin-and-suffering

Upvotes: 3

Related Questions