Midnight Exigent
Midnight Exigent

Reputation: 625

Is there a tuple_windows() adapter for async streams?

I have a code the looks like this

use itertools::Itertools;

let (tx, rx) = std::sync::mpsc::channel();

tokio::spawn(async move {
    for (v1, v2) in rx.into_iter().tuple_windows() {
        // do some computation
    }
}

for v in (0..) {
    tx.send(v).unwrap();
}

When I change the channel to a tokio::mpsc::channel(), rx becomes an async stream (ie. futures::Stream) which doesn't have the .tuple_windows() adapter

Do you know of a crate that provides similar functionality as Itertools for Streams ? If not, how do you recommend doing this ?

Upvotes: 1

Views: 369

Answers (2)

Midnight Exigent
Midnight Exigent

Reputation: 625

OP here, I ended up following @Netwave's answer and implementing my own extension, the actual implementation was a bit different in order to yield a sliding windows (like .tuple_windows() for Itertools): [0, 1, 2, 3] -> [(0, 1), (1, 2), (2, 3)]

It wasn't trivial, so here it is for anyone who might need it

impl<T: Stream> TupleWindowsExt for T {}
trait TupleWindowsExt: Stream {
    fn tuple_windows(self) -> TupleWindows<Self>
    where
        Self: Sized,
    {
        TupleWindows::new(self)
    }
}

pin_project! {
    #[derive(Debug)]
    struct TupleWindows<S: Stream> {
        #[pin]
        stream: S,
        previous: Option<S::Item>,
    }
}

impl<S: Stream> TupleWindows<S> {
    pub fn new(stream: S) -> Self {
        Self {
            stream,
            previous: None,
        }
    }
}

impl<S: Stream> Stream for TupleWindows<S>
where
    S::Item: Clone,
{
    type Item = (S::Item, S::Item);

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
            Some(next) => next,
            None => return Poll::Ready(None),
        };

        if let Some(previous) = this.previous {
            let res = (previous.clone(), current.clone());
            *this.previous = Some(current);
            Poll::Ready(Some(res))
        } else {
            let next = match this.stream.poll_next(cx) {
                Poll::Ready(next) => next,
                Poll::Pending => {
                    *this.previous = Some(current);
                    return Poll::Pending;
                }
            };
            *this.previous = next.clone();
            Poll::Ready(next.map(|next| (current, next)))
        }
    }
}


Upvotes: 1

Netwave
Netwave

Reputation: 42766

There is a StreamExt from Futures. There is no windows functionality there, but you could use it to implement your own extension over it.

Something like:

use async_trait::async_trait;
use futures::stream::StreamExt;
use std::pin::Pin;

#[async_trait]
trait TuplesWindowsExt: StreamExt + Unpin {
    async fn tuples(
        self: &mut Pin<Box<Self>>,
    ) -> (
        Option<<Self as futures::Stream>::Item>,
        Option<<Self as futures::Stream>::Item>,
    )
    where
        <Self as futures::Stream>::Item: Send,
    {
        let a = self.next().await;
        let b = self.next().await;
        (a, b)
    }
}

Playground

Upvotes: 1

Related Questions