Reputation: 625
I have a code the looks like this
use itertools::Itertools;
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
for (v1, v2) in rx.into_iter().tuple_windows() {
// do some computation
}
}
for v in (0..) {
tx.send(v).unwrap();
}
When I change the channel to a tokio::mpsc::channel()
, rx
becomes an async stream (ie. futures::Stream
) which doesn't have the .tuple_windows()
adapter
Do you know of a crate that provides similar functionality as Itertools
for Stream
s ? If not, how do you recommend doing this ?
Upvotes: 1
Views: 369
Reputation: 625
OP here, I ended up following @Netwave's answer and implementing my own extension, the actual implementation was a bit different in order to yield a sliding windows (like .tuple_windows()
for Itertools
): [0, 1, 2, 3]
-> [(0, 1), (1, 2), (2, 3)]
It wasn't trivial, so here it is for anyone who might need it
impl<T: Stream> TupleWindowsExt for T {}
trait TupleWindowsExt: Stream {
fn tuple_windows(self) -> TupleWindows<Self>
where
Self: Sized,
{
TupleWindows::new(self)
}
}
pin_project! {
#[derive(Debug)]
struct TupleWindows<S: Stream> {
#[pin]
stream: S,
previous: Option<S::Item>,
}
}
impl<S: Stream> TupleWindows<S> {
pub fn new(stream: S) -> Self {
Self {
stream,
previous: None,
}
}
}
impl<S: Stream> Stream for TupleWindows<S>
where
S::Item: Clone,
{
type Item = (S::Item, S::Item);
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let current = match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(next) => next,
None => return Poll::Ready(None),
};
if let Some(previous) = this.previous {
let res = (previous.clone(), current.clone());
*this.previous = Some(current);
Poll::Ready(Some(res))
} else {
let next = match this.stream.poll_next(cx) {
Poll::Ready(next) => next,
Poll::Pending => {
*this.previous = Some(current);
return Poll::Pending;
}
};
*this.previous = next.clone();
Poll::Ready(next.map(|next| (current, next)))
}
}
}
Upvotes: 1
Reputation: 42766
There is a StreamExt
from Futures.
There is no windows functionality there, but you could use it to implement your own extension over it.
Something like:
use async_trait::async_trait;
use futures::stream::StreamExt;
use std::pin::Pin;
#[async_trait]
trait TuplesWindowsExt: StreamExt + Unpin {
async fn tuples(
self: &mut Pin<Box<Self>>,
) -> (
Option<<Self as futures::Stream>::Item>,
Option<<Self as futures::Stream>::Item>,
)
where
<Self as futures::Stream>::Item: Send,
{
let a = self.next().await;
let b = self.next().await;
(a, b)
}
}
Upvotes: 1