davesnx
davesnx

Reputation: 384

There's a way of create this sequence of Streams?

I'm trying to implement this marble diagram, with the hipotesis of have a N number of sN$, and I'm adding this streams to the main$.

s1$    +--1--------------------99--------------------->
s2$    +------3--------7------------------------------>

main$  +---[1]-[1, 3]---[1, 7]---[99, 7]-------------->

Right now I have a aproximation, but with the "repetitions"

const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()

main$
  .scan((a, c) => [...a, c], [])
  .subscribe(v => console.log(v))

s1$.subscribe(x => main$.onNext(x))
s2$.subscribe(x => main$.onNext(x))    

s1$.onNext(3)
s2$.onNext(1)

s1$.onNext(6)
s2$.onNext(44)

/*
  Expect:
    [3]
    [3, 1]
    [6, 1]
    [6, 44]
*/

/*
  What I have:
     [3]
     [3, 1]
     [3, 1, 6]
     [3, 1, 6, 44]
*/

There's a way of doing this? Also I tried to add the streams sN$ into main$:

const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()

main$
  .mergeAll()
  .scan((a, c) => [...a, c], [])
  .subscribe(
    (v) => console.log(v)
  )

main$.onNext(s1$)
main$.onNext(s2$)

s1$.onNext(3)
s2$.onNext(1)

s1$.onNext(6)
s2$.onNext(44)

Upvotes: 0

Views: 189

Answers (2)

davesnx
davesnx

Reputation: 384

I solve finally the problem with some filter the nulls that I begin on the startWith():

main$
  .scan((a, c) => [...a, c.startWith(null).shareReplay(1)], [])
  .map(obs => Observable.combineLatest(obs))
  .switch()
  .map((x) => x.filter((x) => x != null))
  .filter((x) => x.length)

Looks un-readable (as any sequence of Rx, but If you draw the marble makes totally sense!)

Upvotes: 0

Dorus
Dorus

Reputation: 7546

You can use combineLatest. While that still require every stream to start with a value, you can prefix a null value to make every stream start with something using startWith.

const source = Rx.Observable.combineLatest(
  s1.startWith(void 0),
  s2.startWith(void 0),
  s3.startWith(void 0),
  (s1, s2, s3) => [s1, s2, s3])

Optional you can remove undefined values from the resulting array.

Now, we can extend that to work with a variable list of streams. Credits to @xgrommx.

main$
 .scan((a, c) => a.concat(c), [])
 .switch(obs => Rx.Observable.combineLatest(obs))

We can also use c.shareReplay(1) to make streams remember there last value when we switch. That however, wont combine with c.startWith(void 0), so we can use either one or the other.

Example:

    const main$ = new Rx.Subject()
    const s1$ = new Rx.Subject(1)
    const s2$ = new Rx.Subject(1)
    const s3$ = new Rx.Subject(1)
    const s4$ = new Rx.Subject(1)

    main$
     .scan((a, c) => a.concat(c.shareReplay(1)), [])
     .map(obs => Rx.Observable.combineLatest(obs))
     .switch()
     .map(v => v.filter(e => !!e))
     .map(v => v.join(','))
     .subscribe(v => $('#result').append('<br>' + v))

    main$.onNext(s1$)
    s1$.onNext(1)
    main$.onNext(s2$)
    s2$.onNext(void 0) // Since we can't use startWith
    main$.onNext(s3$)
    s3$.onNext(5)
    s1$.onNext(55)
    s2$.onNext(12)
    s2$.onNext(14)
    s3$.onNext(6)
    main$.onNext(s4$)
    s4$.onNext(999)
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
    <div id="result"></div>

Upvotes: 1

Related Questions