Reputation: 17762
combineLatest
operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest
complete.
Is there a way to create an Observable that behaves as the one returned by combineLatest
with the only difference that it completes when the first of the Observables passed as parameters complete?
Upvotes: 7
Views: 3604
Reputation: 58400
Yep, you can; you can do something like this:
function combineLatestUntilFirstComplete(...args) {
const shared = args.map(a => a.share());
return Rx.Observable
.combineLatest(...shared)
.takeUntil(Rx.Observable.merge(...shared.map(s => s.last())));
}
const a = Rx.Observable.interval(100).map(index => `a${index}`).take(5);
const b = Rx.Observable.interval(200).map(index => `b${index}`).take(5);
combineLatestUntilFirstComplete(a, b).subscribe(
value => console.log(JSON.stringify(value)),
error => console.error(error),
() => console.log("complete")
);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
The implementation takes values from the observable returned from the internal call to combineLatest
until one of the source observables emits its last value.
Note that the source observables are shared, so that the subscriptions due to the takeUntil
call don't effect secondary subscriptions to cold source observables.
Upvotes: 6
Reputation: 9425
const a = Rx.Observable.interval(400).share();
const b = Rx.Observable.interval(600).share();
const c = Rx.Observable.interval(1000).take(3).share();
const combined = Rx.Observable.combineLatest(a, b, c)
.takeUntil(
Rx.Observable.merge(
a.ignoreElements().concat(Rx.Observable.of('fin-a')).do(console.log),
b.ignoreElements().concat(Rx.Observable.of('fin-b')).do(console.log),
c.ignoreElements().concat(Rx.Observable.of('fin-c')).do(console.log)
)
);
combined
.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.js"></script>
You need to .share()
the input observables because you need them twice, once for the .combineLatest()
and once for the .takeUntil
to complete your observable stream.
I used the .ignoreElements()
in the .takeUntil
to ignore any values and only when the source stream (either a
,b
, or c
) complete .concat
a 'final' message to it to signal to the .takeUntil
to complete our subscription to the .combineLatest
. This also works if either a,b,c
do not emit any values.
Upvotes: 3