Reputation: 696
I was under the impression combineLast
was a good fit at first, but as I read the docs it seems it isn’t: “Be aware that combineLatest
will not emit an initial value until each observable emits at least one value.”… And of course I hit exactly that exception. I tried forkJoin
and merge
in various combinations, but I can’t get it right.
The use-case is pretty straightforward, the method someObs
returns 0
or more observables, over which I loop. Based on a value on the SomeObs
object I push a newly constructed observable, OtherObs[]
, to an Array
of Observable<OtherObs[]>
. This array "needs" to be merged into a single observable, and before returning it I’d like to do some transformations.
Specifically I’m having difficulty replacing the combineLast
operator with something appropriate…
public obs(start: string, end: string): Observable<Array<OtherObs>> {
return this.someObs().pipe(
mergeMap((someObs: SomeObs[]) => {
let othObs: Array<Observable<OtherObs[]>> = [];
someObs.forEach((sobs: SomeObs) => {
othObs.push(this.yetAnotherObs(sobs));
});
return combineLatest<Event[]>(othObs).pipe(
map(arr => arr.reduce((acc, cur) => acc.concat(cur)))
);
})
);
}
private yetAnotherObs(): Observable<OtherObs[]> {
/// ...
}
private somObs(): Observable<SomeObs[]> {
/// ...
}
Upvotes: 9
Views: 23239
Reputation: 2620
The "problem" of combineLatest
is (like you said) that it "will not emit an initial value until each observable emits at least one value". But this is not a problem because you can use the RxJS operator startWith.
So your observable gets an initial value and combineLatest
works like a charme ;)
import { of, combineLatest } from 'rxjs';
import { delay, map, startWith } from 'rxjs/operators';
// Delay to show that this observable needs some time
const delayedObservable$ = of([10, 9, 8]).pipe(delay(5000));
// First Observable
const observable1$ = of([1, 2, 3]);
// Second observable that starts with empty array if no data
const observable2$ = delayedObservable$.pipe(startWith([]));
// Combine observable1$ and observable2$
combineLatest(observable1$, observable2$)
.pipe(
map(([one, two]) => {
// Because we start with an empty array, we already get data from the beginning
// After 5 seconds we also get data from observable2$
console.log('observable1$', one);
console.log('observable2$', two);
// Concat only to show that we map boths arrays to one array
return one.concat(two);
})
)
.subscribe(data => {
// After 0 seconds: [ 1, 2, 3 ]
// After 5 seconds: [ 1, 2, 3, 10, 9, 8 ]
console.log(data);
});
Upvotes: 16