Christian
Christian

Reputation: 859

Add n+1 streams to a RXJS stream with combineLatest

Hope someone can help me with this problem. I have 2 streams that I need to use the operator combineLatest on. After a while I need to add dynamically streams that also need to use combineLatest on. Here is what I need to do:

stream a ---------a---------------b-------------c------------------------>
stream b ----1--------2-----3-------------------------4------------------>
stream c (not defined at start)      -----z-----------------x------------>
stream d (not defined at start)                         ----------k------>
                                 (combineLatest)
result   ---------(a1)(a2)--(a3)--(b3)----(b3z)-(c3z)-(c4z)-(c4x)-(c4xk)->   

UPDATE

To be more specific I want to turn this STREAM (link) enter image description here

To this result:

A----B---B0-C0--D0--D1--E1--E1a--F1a--F2a---G2a---G3a--H3a-H3b--I3b

Upvotes: 7

Views: 2385

Answers (3)

Richard Matsen
Richard Matsen

Reputation: 23483

Taking Oles' answer, simplifying a little and adding test data as given in question update

const Subject = Rx.ReplaySubject
const ReplaySubject = Rx.ReplaySubject

const newStream =  new Subject()

// Set up output, no streams yet
const streamOfStreams = newStream
  .scan( (acc, stream) => {
    acc.push(stream);
    return acc; 
  }, [])
  .switchMap(vs => Observable.combineLatest(vs))
  .map(arrayOfValues => arrayOfValues.join(''))    // declutter
  .subscribe(console.log)

// Add a stream
const s1 = new ReplaySubject() 
newStream.next(s1)
// emit on streams
s1.next('A'); s1.next('B')

// Add a stream
const s2 = new ReplaySubject() 
newStream.next(s2)
// emit on streams
s2.next('0'); s1.next('C')
s1.next('D'); s2.next('1'); s1.next('E'); 

// Add a stream
const s3 = new ReplaySubject() 
newStream.next(s3)
// emit on streams
s3.next('a'); 
s1.next('F'); s2.next('2'); s1.next('G'); s2.next('3'); s1.next('H'); 
s3.next('b'); s1.next('I')

Working example: CodePen


Update

Christian has kindly supplied some test streams which are more 'real world' than the sequenced Subjects I've used above. Unfortunately, these highlight a bug in the solution as it stands.

For reference, the new test streams are

const streamA = Rx.Observable.timer(0,800).map(x => String.fromCharCode(x+ 65));
const streamB = Rx.Observable.timer(0,1300).map(x => x);
const streamC = Rx.Observable.timer(1100, 2000).map(x => String.fromCharCode(x+ 97));

setTimeout(() => newStream.next(streamA), 500);
setTimeout(() => newStream.next(streamB), 2000);
setTimeout(() => newStream.next(streamC), 3000);

Problem #1

The first problem stems from the core line in streamOfStreams,

  .switchMap(vs => Observable.combineLatest(vs))

This essentially says, every time a new array of streams appears, map it to a combineLatest() of the new array and switch to the new observable. However, the test observables are cold, which means each re-subscription gets the full stream.

Ref: Introduction to Rx - Hot and Cold observables

Some observable sequences can appear to be hot when they are in fact cold. A couple of examples that surprise many is Observable.Interval and Observable.Timer

So we get
- expected A--B--B0...
- actual A--B--A0--B0...

The obvious solution is to turn the cold observables hot,

const asHot = (stream) => {
  const hot = stream.multicast(() => new Rx.Subject())
  hot.connect()
  return hot
}

but this omits B0 from the sequence, A--B--C0..., so we want hot + 1 previous which can be had with a buffer size one

const asBuffered = (stream) => {
  const bufferOne = new ReplaySubject(1)
  stream.subscribe(value => bufferOne.next(value))
  return bufferOne
}

Problem #2

The second problem comes from the fact that streamC delays it's first emit by 1100ms (good test Christian!).

This results is
- expected A--B--B0--C0--D0--D1--E1--E1a...
- actual A--B--B0--C0--D0--E1a...

which means we need to delay adding a stream until it's first emit

const addStreamOnFirstEmit = (stream) => {
  const buffered = asBuffered(stream)
  buffered.first().subscribe( _ => {
    newStream.next(buffered)
  })
}

Working example: CodePen

Notes on the CodePen

I've left in the various streamAdder functions for experimentation, and there are also _debug versions that emit the streams and the addStream events to show the sequence.

Also limited the source streams so that the console doesn't scroll too much.

Note on the expected output

The new solution diverges from the expected output given in the question after 'G3a'

  • expected A----B---B0-C0--D0--D1--E1--F1---F2---F2a---G2a---G3a--H3a--H3b--I3b
  • actual A----B---B0-C0--D0--D1--E1--E1a--F1a--F2a---G2a---G3a--G3b--H3b--I3b

which is due to the simultaneous emission of of 'H' and 'b'. Problem #3?

One more test

In order to see if the solution failed if streamC delayed first emission until after two emits of streamA & streamB, I changed the delay to 1800ms

const streamC = Rx.Observable.timer(1800, 2000).map(x => String.fromCharCode(x+ 97));

I believe the output for this test is correct.

Upvotes: 3

Oles Savluk
Oles Savluk

Reputation: 4345

The idea it that everything is a stream. Even stream of streams :)

const onNew$ = new Rx.Subject();

const a$ = Rx.Observable.interval(1000).mapTo('a');
const b$ = Rx.Observable.interval(1000).mapTo('b');

const comb$ = Rx.Observable
  .merge(
    onNew$,
    Rx.Observable.from([a$, b$]),
  )
  .scan((acc, v) => {
      acc.push(v);
      return acc;
    }, [])
  .switchMap(vs => Rx.Observable.combineLatest(vs))

comb$.take(4).subscribe(v => console.log(v));

setTimeout(
  () => onNew$.next(Rx.Observable.interval(1000).mapTo('c')),
  2000,
);
setTimeout(
  () => onNew$.next(Rx.Observable.interval(1000).mapTo('d')),
  4000,
);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Upvotes: 3

Richard Matsen
Richard Matsen

Reputation: 23483

Can be done if you can unsubscribe and re-subscribe for each new stream

// Start with two streams
const s1 = new ReplaySubject(1) 
const s2 = new ReplaySubject(1) 
let out = Observable.combineLatest(s1, s2)
let subscription = out.subscribe(console.log)
s2.next('1'); s1.next('a'); s2.next('2'); s2.next('3'); s1.next('b')

// Add a new stream
subscription.unsubscribe()
const s3 = new ReplaySubject(1) 
out = Observable.combineLatest(s1, s2, s3)
subscription = out.subscribe(console.log)
s3.next('z'); s1.next('c'); s2.next('4'); s3.next('x')

// Add a new stream
subscription.unsubscribe()
const s4 = new ReplaySubject(1) 
out = Observable.combineLatest(s1, s2, s3, s4)
subscription = out.subscribe(console.log)
s4.next('k')

Working example: CodePen

Upvotes: 0

Related Questions