Marek Miettinen
Marek Miettinen

Reputation: 399

How to combine latest values of two streams using futures-rs?

How can I implement the equivalent of Rx combineLatest with futures-rs streams?

I'd like to get a stream of pairs with the current value of either stream, with the latest value from the other stream. The first value would be emitted only after having one value from each stream.

Upvotes: 4

Views: 982

Answers (3)

Legokichi Duckscallion
Legokichi Duckscallion

Reputation: 333

How about this (in futures-0.3)

futures::stream::select(
    st1.map(Either::Left),
    st2.map(Either::Right),
).fold((None, None), |(l, r), either|async move{
    let ret = match either {
        Either::Left(l) => (Some(l), r),
        Either::Right(r) => (l, Some(r)),
    };
    println!("{:?}", ret);
    ret
})

example: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=38827437b5227cfbf753062f727f2d0b

Upvotes: 2

Marcel
Marcel

Reputation: 199

Below my crude implementation. Feedback is welcome! Full setup including cargo here.

use futures::{Async, Stream, Poll};
use futures::stream::Fuse;

#[cfg(test)]
mod tests {

    extern crate tokio_core;
    extern crate tokio_timer;
    use self::tokio_timer::*;
    use self::tokio_core::reactor::Core;

    use futures::Stream;
    use std::time::Duration;

    use zip_latest::zip_latest;

    #[test]
    /// Combine two timer streams that produce values independent of each other
    /// Output
    /// ```
    ///    Tuple (Some(1), None)
    ///    Tuple (Some(2), None)
    ///    Tuple (Some(3), None)
    ///    Tuple (Some(4), None)
    ///    Tuple (Some(5), Some(1))
    ///    Tuple (Some(6), Some(1))
    ///    Tuple (Some(7), Some(1))
    ///    Tuple (Some(8), Some(1))
    ///    Tuple (Some(9), Some(1))
    ///    Tuple (Some(10), Some(2))
    /// ```
    ///
    fn combine_two_timer_streams() {
        let mut a_counter = 0;
        let mut b_counter = 0;

        let a = Timer::default()
            .interval(Duration::from_millis(1000))
            .map(|_| {
                a_counter += 1;
                a_counter
            })
            // We'll stop the stream after 10 loops
            .take_while(|x| Ok(x <= &10));
        let b = Timer::default().interval(Duration::from_millis(5000)).map(
            |_| {
                b_counter += 1;
                b_counter
            },
        );

        let zl = zip_latest(a, b).for_each(|tuple| {
            println!("Tuple {:?}", tuple);
            Ok(())
        });

        let mut core = Core::new().unwrap();
        core.run(zl).unwrap();
    }
}

#[derive(Debug)]
/// get at most one error at a time.
#[must_use = "streams do nothing unless polled"]
pub struct ZipLatest<S1: Stream, S2: Stream> {
    stream1: Fuse<S1>,
    stream2: Fuse<S2>,
    queued1: Option<S1::Item>,
    queued2: Option<S2::Item>,
}

///
/// Combines the latest output of two streams.
/// It will produce a new tuple everytime any of the streams produces a new value.
/// The stream will produce `Option::None` values for streams that have not yet produced a value.
///
pub fn zip_latest<S1, S2, T1: Clone, T2: Clone>(stream1: S1, stream2: S2) -> ZipLatest<S1, S2>
where
    S1: Stream<Item = T1>,
    S2: Stream<Item = T2, Error = S1::Error>,
{
    ZipLatest {
        stream1: stream1.fuse(),
        stream2: stream2.fuse(),
        queued1: None,
        queued2: None,
    }
}

impl<S1, S2, T1: Clone, T2: Clone> Stream for ZipLatest<S1, S2>
where
    S1: Stream<Item = T1>,
    S2: Stream<Item = T2, Error = S1::Error>,
{
    type Item = (Option<S1::Item>, Option<S2::Item>);
    type Error = S1::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {

        match (self.stream1.poll()?, self.stream2.poll()?) {
            (Async::Ready(None), _) => Ok(Async::Ready(None)),
            (_, Async::Ready(None)) => Ok(Async::Ready(None)),
            (Async::Ready(Some(item1)), Async::Ready(Some(item2))) => {
                self.queued1 = Some(item1);
                self.queued2 = Some(item2);
                Ok(Some((self.queued1.clone(), self.queued2.clone())).into())
            },
            (Async::Ready(Some(item1)), Async::NotReady) => {
                self.queued1 = Some(item1);
                Ok(Some((self.queued1.clone(), self.queued2.clone())).into())
            },
            (Async::NotReady, Async::Ready(Some(item2))) => {
                self.queued2 = Some(item2);
                Ok(Some((self.queued1.clone(), self.queued2.clone())).into())
            },
            (Async::NotReady, Async::NotReady) => Ok(Async::NotReady)
        }
    }
}

Upvotes: 2

hfhc2
hfhc2

Reputation: 4421

How about the zip method of the Stream trait?

Upvotes: 0

Related Questions