dtech
dtech

Reputation: 14060

combineLatest emit only when one of the streams changes

I have a stream with frequent values and one with slower ones. I want to combine them, but only emit a value when the slower one emits. So combineLatest doesn't work. Like so:

a1
a2
b1
(a2,b1)
a3
a4
a5
b2
(a5,b2)

Currently I'm doing it like follows, is there a cleaner way?

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] =
  Observable({ o =>
    var last : A
    fast.subscribe({a => last = a})
    slow.subscribe({b => o.onNext((last,b))})
  })

edit: This operator is now in Rx and is called withLatestFrom.

Upvotes: 8

Views: 4019

Answers (1)

André Staltz
André Staltz

Reputation: 13974

What you are looking for is a combinator I have called "combinePrev", which doesn't exist in the API but turns out to be very necessary in many situations. sample operator comes close, but it doesn't combine the two streams. I've also missed "combinePrev" in RxJS. It turns out, the implementation of "combinePrev" ("withLatest") is simple and just depends on map and switch:

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = {
  val hotSlow = slow.publish.refCount
  fast.map({a => hotSlow.map({b => (a,b)})}).switch
}

Here is a jsfiddle example of the same operator implemented in RxJS.

While the operator is not in Rx, you can use an implicit class so you can use slow.withLatest(fast):

implicit class RXwithLatest[B](slow: Observable[B]) {
  def withLatest[A](fast : Observable[A]) : Observable[(A,B)] = /* see above */
}

Note: slow must be hot. If slow is a cold Observable, withLatest doesn't work.

Upvotes: 4

Related Questions