amit
amit

Reputation: 2331

How to split a stream and recombine the sub-streams final results in RxJs

I have a source stream that can emit two types of messages. I would like to separate them into two separate streams and once the original stream complete, re-combine their final emitted value (or get undefined if none exists).

e.g

const split1$ = source$.pipe(
     filter(m) => m.kind === 1, 
     mergeMap(m) => someProcessing1());
const split2$ = source$.pipe(
     filter(m) => m.kind === 2, 
     mergeMap(m) => someProcessing2());
forkJoin(split1$, split2$).subscribe(
(output1, output2) => console.log(output1, output2));

The problem is that nothing guarantee that both split1$ and split2$ will emit values. If that happens, forkJoin never emits. With what can I replace forkJoin to emit a value whenever the source stream completes.

Upvotes: 1

Views: 381

Answers (1)

Roberto Zvjerković
Roberto Zvjerković

Reputation: 10127

About splitting the stream: https://www.learnrxjs.io/operators/transformation/partition.html

About "emit on complete", can't you just use complete callback instead? .subscribe(() => console.log('Emitted"), null, () => console.log('Completed'));

Otherwise you can use startWith operator to make sure something was emitted.

const [evens, odds] = source.pipe(partition(val => val % 2 === 0));
evens = evens.pipe(startWith(undefined)); // This will emit undefined before everything, so forkJoin will surely emit

Add startWith in the forkJoin constructor:

forkJoin(evens.pipe(startWith(undefined)), odds.pipe(startWith(undefined)))
  .subscribe(console.log))

Upvotes: 2

Related Questions