danday74
danday74

Reputation: 57116

I need something like combineLatest but when any observable emits

So my code is:

combineLatest([obs1, obs2]).subscribe((x => {
  console.log(x)
})

The log statement should be fired when EITHER observable emits. However, there is a gotcha as described in the docs:

combineLatest will not emit an initial value until each observable emits at least one value

I need it to emit even if one of the original observables has never emitted. How would I do this?

Upvotes: 3

Views: 4594

Answers (3)

Dominik Brázdil
Dominik Brázdil

Reputation: 493

You can use Merge method.

merge(obs1, obs2).subscribe(x => {
  console.log(x);
});

Upvotes: 3

Rafi Henig
Rafi Henig

Reputation: 6422

Below is a simple example of how it might be implemented (rather than adding pipe(startWith(null)) to each Observable in the list).

// overloading:

function combineLatestAny<O1 extends ObservableInput<any>>(sources: [O1]): Observable<[ObservedValueOf<O1>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(sources: [O1, O2]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(sources: [O1, O2, O3]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(sources: [O1, O2, O3, O4]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5, O6]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
function combineLatestAny<O extends ObservableInput<any>>(sources: O[]): Observable<ObservedValueOf<O>[]>;

// implementation:

function combineLatestAny(sources) {
  return new Observable<any>(observer => {
    const subscribtion = new Subscription();
    const values = new Array(sources.length);
    let active: number = sources.length;

    sources.forEach((source, index) => subscribtion.add(
      source.subscribe({
        next: value => { values[index] = value; observer.next(values) },
        error: error => { observer.error(error); subscribtion.unsubscribe() },
        // only complete when all input observables have completed
        complete: () => (--active === 0) && observer.complete() 
      })
    ))

    return {
      unsubscribe: () => subscribtion.unsubscribe()
    }
  })
}

Usage:

combineLatestAny([timer(1000), timer(3000).pipe(mapTo("string")), timer(2000).pipe(mapTo(true))])
  .subscribe(
    {
      next: console.log,
      error: console.error,
      complete: () => console.log("completed")
    }
  )

Upvotes: 0

Eliseo
Eliseo

Reputation: 57971

typical use startWith with each observable

combineLatest([
   obs1.pipe(startWith(null)),
   obs2.pipe(startWith(null)]
   ).subscribe((x => {
      console.log(x)
   })

Upvotes: 6

Related Questions