Reputation: 3009
Q: can RxJs operators be used to flatten an array, transform items, then unflatten it, whilst maintaining a continuous stream (not completing)?
For the simplified example here: https://stackblitz.com/edit/rxjs-a1791p?file=index.ts
If following the approach:
mergeMap(next => next),
switchMap(next => of(***transforming logic***)),
toArray()
then the observable does not complete, and the values do not come through. A take(1)
could be added but this is intended to be a continuous stream.
If using:
mergeMap(next => next),
switchMap(next => of(***transforming logic***)),
scan()
then this works great. However, then each time the source observable emits, the accumulator never resets, so the scan()
which is intended to accumulate the values back into an array ends up combining multiple arrays from each pass. Can the accumulator be reset?
Obviously it can be accomplished with:
switchMap(next => of(next.map(***transforming logic***)))
But my real-world example is an awful lot more complicated than this, and is tied into NgRx.
Upvotes: 1
Views: 215
Reputation: 14740
You don't need to use mergeMap
or switchMap
here. You would only need those if you are doing something asynchronously. Like if you were taking the input value and creating an observable (ex: to make an http call).
By using of
inside of mergeMap
, you are essentially starting with an Observable
, taking the unpacked value (an array
), then turning it back into an Observable
.
From your stack blitz:
The reason your first strategy doesn't complete is because toArray()
is happening on the level of the source (clicksFromToArrayButton
), and that is never going to complete.
If you really wanted to, you could nest it up a level, so that toArray()
happens on the level of your array (created with from()
, which will complete after all values are emitted).
const transformedMaleNames = maleNames.pipe(
mergeMap(next => from(next).pipe(
map(next => {
const parts = next.name.split(' ');
return { firstName: parts[0], lastName: parts[1] };
}),
toArray()
)
),
);
But... we don't really need to use from
to create an observable, just so it can complete, just so toArray()
can put it back together for you. We can use the regular map
operator instead of mergeMap
, along with Array.map()
:
const transformedMaleNames = maleNames.pipe(
map(nextArray => {
return nextArray.map(next => {
const parts = next.name.split(' ');
return { firstName: parts[0], lastName: parts[1] };
})
})
);
this works, but isn't necessarily utilizing RxJS operators fully?
Well, ya gotta use the right tool for the right job! In this case, you are simply transforming array elements, so Array.map()
is perfect for this.
But my real-world example is an awful lot more complicated than this
If you are concerned about the code getting messy, you can just break the transformation logic out into it's own function:
const transformedMaleNames = maleNames.pipe(
map(next => next.map(transformName))
);
function transformName(next) {
const parts = next.name.split(' ');
return { firstName: parts[0], lastName: parts[1] };
}
Here's a working StackBlitz.
Upvotes: 1
Reputation: 11944
Here would be one approach:
src$.pipe(
mergeMap(
arr => from(arr)
.pipe(
switchMap(item => /* ... */),
toArray(),
)
)
)
For each emitted array, mergeMap
will create an inner observable(from(..)
). There, from(array)
will emit each item separately, allowing you to perform some logic in switchMap
. Attaching toArray()
at the end will give you an array with the results from switchMap
's inner observable.
Upvotes: 1