Ilyas Malgazhdarov
Ilyas Malgazhdarov

Reputation: 175

Stream of arrays into array of streams operator

In Rx.js, how to turn a stream of arrays into array of streams for example i have a stream of following:['0a','0b'], ['1a','1b'],['2a','2b','2c'] and i want to get the following streams:

0a---1a---2a--->
0b---1b---2b--->
          2c--->

Are there any operators for doing something like that or should I write one from scratch?

Upvotes: 3

Views: 4978

Answers (2)

almeynman
almeynman

Reputation: 7398

Something like this should work

stream.
  flatMap(array =>
    Rx.Observable.from(
      array.map((obj, i) => {index: i, ...obj})
    )
  ).groupBy(x => x.index, ).
  subscribe(x =>
    x.map((x,i) => subscribe(x))
  )

Upvotes: 2

user3743222
user3743222

Reputation: 18665

You can achieve it relatively easily with the existing operators.

What you want to achieve is very similar to what is described here : RXJS: alternately combine elements of streams

It suggests two ways :

  1. using the Rx.Observable.zip operator (takes as argument an array of observables and emit a stream of arrays whose element at index n is the xth value emitted by the nth observable)

    That solution however, applied in your example will stop at 1a,1b because the resulting observable will complete as soon as one of the observable completes.

  2. extending your arrays to give them all the same length by completing with dummy values, and applying the Rx.Observable.zip operator

In those both options :

  • if you remove the last line, .concatMap.... you will get a stream of array like [0a,0b], [1a,1b], [2a,2b,2c] from which you can easily map by index (.map(function(array){return array[N];}) will get you [Na,Nb...]) to get the stream which you want.
  • OR you can keep the exact same code and add .filter(function(value,index){return index % N == I}), where N is the number of streams, and I is the stream you want, i.e. the stream with values (Ia,Ib...)

Documentation about the zip operator > http://reactivex.io/documentation/operators/zip.html https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/zip.md,

Upvotes: 0

Related Questions