Reputation: 384
I'm trying to implement this marble diagram, with the hipotesis of have a N number of sN$, and I'm adding this streams to the main$.
s1$ +--1--------------------99--------------------->
s2$ +------3--------7------------------------------>
main$ +---[1]-[1, 3]---[1, 7]---[99, 7]-------------->
Right now I have a aproximation, but with the "repetitions"
const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()
main$
.scan((a, c) => [...a, c], [])
.subscribe(v => console.log(v))
s1$.subscribe(x => main$.onNext(x))
s2$.subscribe(x => main$.onNext(x))
s1$.onNext(3)
s2$.onNext(1)
s1$.onNext(6)
s2$.onNext(44)
/*
Expect:
[3]
[3, 1]
[6, 1]
[6, 44]
*/
/*
What I have:
[3]
[3, 1]
[3, 1, 6]
[3, 1, 6, 44]
*/
There's a way of doing this? Also I tried to add the streams sN$ into main$:
const main$ = new Rx.Subject()
const s1$ = new Rx.Subject()
const s2$ = new Rx.Subject()
main$
.mergeAll()
.scan((a, c) => [...a, c], [])
.subscribe(
(v) => console.log(v)
)
main$.onNext(s1$)
main$.onNext(s2$)
s1$.onNext(3)
s2$.onNext(1)
s1$.onNext(6)
s2$.onNext(44)
Upvotes: 0
Views: 189
Reputation: 384
I solve finally the problem with some filter the nulls that I begin on the startWith()
:
main$
.scan((a, c) => [...a, c.startWith(null).shareReplay(1)], [])
.map(obs => Observable.combineLatest(obs))
.switch()
.map((x) => x.filter((x) => x != null))
.filter((x) => x.length)
Looks un-readable (as any sequence of Rx, but If you draw the marble makes totally sense!)
Upvotes: 0
Reputation: 7546
You can use combineLatest. While that still require every stream to start with a value, you can prefix a null
value to make every stream start with something using startWith.
const source = Rx.Observable.combineLatest(
s1.startWith(void 0),
s2.startWith(void 0),
s3.startWith(void 0),
(s1, s2, s3) => [s1, s2, s3])
Optional you can remove undefined
values from the resulting array.
Now, we can extend that to work with a variable list of streams. Credits to @xgrommx.
main$
.scan((a, c) => a.concat(c), [])
.switch(obs => Rx.Observable.combineLatest(obs))
We can also use c.shareReplay(1)
to make streams remember there last value when we switch
. That however, wont combine with c.startWith(void 0)
, so we can use either one or the other.
Example:
const main$ = new Rx.Subject()
const s1$ = new Rx.Subject(1)
const s2$ = new Rx.Subject(1)
const s3$ = new Rx.Subject(1)
const s4$ = new Rx.Subject(1)
main$
.scan((a, c) => a.concat(c.shareReplay(1)), [])
.map(obs => Rx.Observable.combineLatest(obs))
.switch()
.map(v => v.filter(e => !!e))
.map(v => v.join(','))
.subscribe(v => $('#result').append('<br>' + v))
main$.onNext(s1$)
s1$.onNext(1)
main$.onNext(s2$)
s2$.onNext(void 0) // Since we can't use startWith
main$.onNext(s3$)
s3$.onNext(5)
s1$.onNext(55)
s2$.onNext(12)
s2$.onNext(14)
s3$.onNext(6)
main$.onNext(s4$)
s4$.onNext(999)
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script>
<div id="result"></div>
Upvotes: 1