Dr. Light
Dr. Light

Reputation: 434

how to implement trait futures::stream::Stream?

So I'm getting a Response from the reqwest crate and passing it to a HttpResponseBuilder from the actix_web create. However I've tried and failed to understand how to implement the Stream trait from the futures create on a custom struct to act as a middleman and copy the contents down to a file.

I've tried doing this so far, but I'm not sure what to put inside that poll_next function to make it all work.

struct FileCache {
    stream: Box<dyn futures::Stream<Item = reqwest::Result<bytes::Bytes>>>,
}

impl FileCache {
    fn new(stream: Box<dyn futures::Stream<Item = reqwest::Result<bytes::Bytes>>>) -> Self {
        FileCache { stream }
    }
}

impl Stream for FileCache {
    type Item = reqwest::Result<bytes::Bytes>;
    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
    }
}

Upvotes: 2

Views: 1544

Answers (1)

cdhowie
cdhowie

Reputation: 168958

This is possible but requires you to understand what pinning is and how to use it safely.

Basically, we just need to proxy to self.stream.poll_next(), but this method accepts Pin<&mut Self> (as you can see in your own implementation). Storing the box as Pin<Box<T>> instead of Box<T> will give us a way to obtain this Pin relatively easily, without requiring unsafe. Making this change is straightforward, since there is a From implementation allowing conversion of Box<T> to Pin<Box<T>> directly:

use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;

struct FileCache {
    stream: Pin<Box<dyn Stream<Item = reqwest::Result<bytes::Bytes>>>>,
}

impl FileCache {
    fn new(stream: Box<dyn Stream<Item = reqwest::Result<bytes::Bytes>>>) -> FileCache {
        FileCache { stream: stream.into() }
    }
}

Now we have to figure out how to go from Pin<&mut FileCache> to Pin<&mut dyn Stream<...>>. The correct incantation here is self.get_mut().stream.as_mut():

impl Stream for FileCache {
    type Item = reqwest::Result<bytes::Bytes>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.get_mut().stream.as_mut().poll_next(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(v) => {
                // Do what you need to do with v here.
                Poll::Ready(v)
            }
        }
    }
}

The catch is that poll_next isn't async and so you can't asynchronously wait for whatever you're doing with v. bytes::Bytes is atomically-refcounted, though, so you could clone the inner bytes::Bytes value and spawn a separate task on your executor, which is probably what you want to do anyway so that whoever is waiting for FileCache doesn't have to wait for that task to complete before using the data. So you'd do something like:

            Poll::Ready(v) => {
                if let Some(Ok(ref bytes)) = &v {
                    let bytes = bytes.clone();
                    spawn_new_task(async move {
                        // Do something with bytes
                    });
                }
                Poll::Ready(v)
            }

Where spawn_new_task() is the function your executor provides, e.g. tokio::spawn().

Now that we can see what we're doing here, we can simplify this down and eliminate the match by pushing Poll::Ready into our pattern, and unconditionally returning whatever the inner poll_next() call did:

impl Stream for FileCache {
    type Item = reqwest::Result<bytes::Bytes>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let r = self.get_mut().stream.as_mut().poll_next(cx);
        if let Poll::Ready(Some(Ok(ref bytes))) = &r {
            let bytes = bytes.clone();
            spawn_new_task(async move {
                // Do something with bytes
            });
        }
        r
    }
}

Upvotes: 2

Related Questions