james emanon
james emanon

Reputation: 11807

RxJS combineLatest confusion

I thought I understood combineLatest, but given my output - I'm not understanding it. I thought with combineLatest that all observables emit their last values whenever ANY of the observables emit.

(note: I just did the take(5) to limit my console output)

So, given this trivial example -

const int1$ = Rx.Observable.interval(1000).take(5)
const int2$ = Rx.Observable.interval(500).take(5)
const int3$ = Rx.Observable.interval(3000).take(5)
const all$ = Rx.Observable.combineLatest(
  int1$, int2$, int3$ 
)

all$.subscribe(latestValues => {
  const [int1, int2, int3] = latestValues;
  console.log(`
      interval one @ 1000 ${int1},
      interval two @ 500 ${int2},
      interval three @ 3000 ${int3}
  `)
})

I thought to see

"
      interval one @ 1000 0,
      interval two @ 500 1,
      interval three @ 3000 0
  "
"
      interval one @ 1000 1,
      interval two @ 500 2,
      interval three @ 3000 0
  "
"
      interval one @ 1000 1,
      interval two @ 500 3,
      interval three @ 3000 1
  "
"
      interval one @ 1000 2,
      interval two @ 500 4,
      interval three @ 3000 1

BUT I am getting

"
      interval one @ 1000 2,
      interval two @ 500 4,
      interval three @ 3000 0
  "
"
      interval one @ 1000 3,
      interval two @ 500 4,
      interval three @ 3000 0
  "
"
      interval one @ 1000 4,
      interval two @ 500 4,
      interval three @ 3000 0
  "
"
      interval one @ 1000 4,
      interval two @ 500 4,
      interval three @ 3000 1

slightly confused. Your thoughts on why I am not seeing what I expect would be awesome!

Upvotes: 1

Views: 2046

Answers (1)

0x384c0
0x384c0

Reputation: 2294

http://reactivex.io/documentation/operators/combinelatest.html CombineLatest emits an item whenever any of the source Observables emits an item

(so long as each of the source Observables has emitted at least one item) <<== THIS

int3 is not emitted any items before 3000ms, so Rx waits for it, then calls onNext with latest items

Possible solution: try to use timer instead (emits first value at 0s, then once every n s)

const int1$ = Rx.Observable.timer(0,1000).take(5)
const int2$ = Rx.Observable.timer(0,500).take(5)
const int3$ = Rx.Observable.timer(0,3000).take(5)
const all$ = Rx.Observable.combineLatest(
  int1$, int2$, int3$ 
)

all$.subscribe(latestValues => {
  const [int1, int2, int3] = latestValues;
  console.log(`
      interval one @ 1000 ${int1},
      interval two @ 500 ${int2},
      interval three @ 3000 ${int3}
  `)
})

Upvotes: 3

Related Questions