Reputation: 2331
I have a source stream that can emit two types of messages. I would like to separate them into two separate streams and once the original stream complete, re-combine their final emitted value (or get undefined if none exists).
e.g
const split1$ = source$.pipe(
filter(m) => m.kind === 1,
mergeMap(m) => someProcessing1());
const split2$ = source$.pipe(
filter(m) => m.kind === 2,
mergeMap(m) => someProcessing2());
forkJoin(split1$, split2$).subscribe(
(output1, output2) => console.log(output1, output2));
The problem is that nothing guarantee that both split1$ and split2$ will emit values. If that happens, forkJoin never emits. With what can I replace forkJoin to emit a value whenever the source stream completes.
Upvotes: 1
Views: 381
Reputation: 10127
About splitting the stream: https://www.learnrxjs.io/operators/transformation/partition.html
About "emit on complete", can't you just use complete
callback instead? .subscribe(() => console.log('Emitted"), null, () => console.log('Completed'))
;
Otherwise you can use startWith
operator to make sure something was emitted.
const [evens, odds] = source.pipe(partition(val => val % 2 === 0));
evens = evens.pipe(startWith(undefined)); // This will emit undefined before everything, so forkJoin will surely emit
Add startWith
in the forkJoin
constructor:
forkJoin(evens.pipe(startWith(undefined)), odds.pipe(startWith(undefined)))
.subscribe(console.log))
Upvotes: 2