Picci
Picci

Reputation: 17762

a variation of combineLatest that completes when the first of the observables passed as parameters completes

combineLatest operator returns an Observable that completes when all of the observables passed in as parameters to combineLatest complete.

Is there a way to create an Observable that behaves as the one returned by combineLatest with the only difference that it completes when the first of the Observables passed as parameters complete?

Upvotes: 7

Views: 3604

Answers (2)

cartant
cartant

Reputation: 58400

Yep, you can; you can do something like this:

function combineLatestUntilFirstComplete(...args) {
  const shared = args.map(a => a.share());
  return Rx.Observable
    .combineLatest(...shared)
    .takeUntil(Rx.Observable.merge(...shared.map(s => s.last())));
}

const a = Rx.Observable.interval(100).map(index => `a${index}`).take(5);
const b = Rx.Observable.interval(200).map(index => `b${index}`).take(5);

combineLatestUntilFirstComplete(a, b).subscribe(
  value => console.log(JSON.stringify(value)),
  error => console.error(error),
  () => console.log("complete")
);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

The implementation takes values from the observable returned from the internal call to combineLatest until one of the source observables emits its last value.

Note that the source observables are shared, so that the subscriptions due to the takeUntil call don't effect secondary subscriptions to cold source observables.

Upvotes: 6

Mark van Straten
Mark van Straten

Reputation: 9425

const a = Rx.Observable.interval(400).share();
const b = Rx.Observable.interval(600).share();
const c = Rx.Observable.interval(1000).take(3).share();

const combined = Rx.Observable.combineLatest(a, b, c)
  .takeUntil(
    Rx.Observable.merge(
      a.ignoreElements().concat(Rx.Observable.of('fin-a')).do(console.log),
      b.ignoreElements().concat(Rx.Observable.of('fin-b')).do(console.log),
      c.ignoreElements().concat(Rx.Observable.of('fin-c')).do(console.log)
    )
  );

combined
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.js"></script>

You need to .share() the input observables because you need them twice, once for the .combineLatest() and once for the .takeUntil to complete your observable stream.

I used the .ignoreElements() in the .takeUntil to ignore any values and only when the source stream (either a,b, or c) complete .concat a 'final' message to it to signal to the .takeUntil to complete our subscription to the .combineLatest. This also works if either a,b,c do not emit any values.

Upvotes: 3

Related Questions