Reputation: 77
I have two streams. Let's say:
const firstStream = Rx.of([
{
first: 'first',
}, {
third: 'third',
}
]);
const secondStream = Rx.of([
{
second: 'second'
}, {
fourth: 'fourth'
}
]);
Now I want a stream that combines the result of these two streams and maps the result array sorted as follows:
const resultArr = [
{
first: 'first',
},
{
second: 'second'
},
{
third: 'third',
},
{
fourth: 'fourth'
}
];
I tried to use combineLatest
with RxJS flatmap
operator, but that did not work out. I provided a stackblitz playground to test around: StackBlitz
I'm sure there are plenty of ways to do this. Maybe someone can help me out :)
Upvotes: 2
Views: 2726
Reputation: 3588
enter code hereAs you said your streams first complete and after that, you need the sorted value as single output of the stream, so I would recommend the forkJoin
operator, which operator will Wait for Observables to complete and then combine last values they emitted.
const { of, forkJoin } = rxjs;
const { map } = rxjs.operators;
let a$ = of([1, 8, 10, 4]);
let b$ = of([3, 5, 43, 0]);
forkJoin(a$, b$)
.pipe(
map(([a, b]) => [...a, ...b]),
map(x => x.sort((a, b) => a - b))
)
.subscribe(x => {
console.log('Sorted =>', x);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.js" integrity="sha256-mNXCdYv896VtdKYTBWgurbyH+p9uDUgWE4sYjRnB5dM=" crossorigin="anonymous"></script>
Upvotes: 1
Reputation: 14189
const { from, merge } = rxjs;
const { reduce, map, mergeMap } = rxjs.operators
const a = from(['first', 'third']);
const b = from(['second', 'fourth']);
const sortMap = {
first: 0,
second: 1,
third: 2,
fourth: 4,
}
merge(a, b).pipe(
// wait until every observable has completed,
// zip all the values into an array
reduce((res, item) => res.concat(item), []),
// sort the array accordingly to your needs
map(list => list.sort((a, b) => sortMap[a] - sortMap[b])),
// flatten the array into a sequence
mergeMap(list => list),
).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.js" integrity="sha256-mNXCdYv896VtdKYTBWgurbyH+p9uDUgWE4sYjRnB5dM=" crossorigin="anonymous"></script>
Upvotes: 4