Reputation: 57116
So my code is:
combineLatest([obs1, obs2]).subscribe((x => {
console.log(x)
})
The log statement should be fired when EITHER observable emits. However, there is a gotcha as described in the docs:
combineLatest will not emit an initial value until each observable emits at least one value
I need it to emit even if one of the original observables has never emitted. How would I do this?
Upvotes: 3
Views: 4594
Reputation: 493
You can use Merge method.
merge(obs1, obs2).subscribe(x => {
console.log(x);
});
Upvotes: 3
Reputation: 6422
Below is a simple example of how it might be implemented (rather than adding pipe(startWith(null))
to each Observable in the list).
// overloading:
function combineLatestAny<O1 extends ObservableInput<any>>(sources: [O1]): Observable<[ObservedValueOf<O1>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>>(sources: [O1, O2]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(sources: [O1, O2, O3]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(sources: [O1, O2, O3, O4]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
function combineLatestAny<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(sources: [O1, O2, O3, O4, O5, O6]): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
function combineLatestAny<O extends ObservableInput<any>>(sources: O[]): Observable<ObservedValueOf<O>[]>;
// implementation:
function combineLatestAny(sources) {
return new Observable<any>(observer => {
const subscribtion = new Subscription();
const values = new Array(sources.length);
let active: number = sources.length;
sources.forEach((source, index) => subscribtion.add(
source.subscribe({
next: value => { values[index] = value; observer.next(values) },
error: error => { observer.error(error); subscribtion.unsubscribe() },
// only complete when all input observables have completed
complete: () => (--active === 0) && observer.complete()
})
))
return {
unsubscribe: () => subscribtion.unsubscribe()
}
})
}
Usage:
combineLatestAny([timer(1000), timer(3000).pipe(mapTo("string")), timer(2000).pipe(mapTo(true))])
.subscribe(
{
next: console.log,
error: console.error,
complete: () => console.log("completed")
}
)
Upvotes: 0
Reputation: 57971
typical use startWith with each observable
combineLatest([
obs1.pipe(startWith(null)),
obs2.pipe(startWith(null)]
).subscribe((x => {
console.log(x)
})
Upvotes: 6