Reputation: 5989
I'm trying to buffer up to n (assume 5) items from a stream but emit partially filled arrays until the buffer is full.
Assuming that I have a stream like:
const stream = Rx.Observable.range(0, 6);
I want to emit:
[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[1, 2, 3, 4, 5]
I have two solutions so far and I'm wondering which one is more "Rx way".
Scan and shift the accumulator if there are more than n elements:
stream.scan((acc, current) => {
acc.push(current);
if (acc.length > 5) {
acc.shift();
}
return acc;
}, []);
Or scan the stream while there are no more than 5 elements and merge with buffer:
stream.scan((acc, current) => {
acc.push(current);
return acc;
}, [])
.takeWhile((x) => { return x.length < 5 })
.merge(stream.bufferWithCount(5));
Which approach is better and more consistent with Rx? Performance-wise shifting the array is, give or take 60% faster than merging according to a quick test at jsperf.
Or maybe there is a better solution?
Upvotes: 2
Views: 867
Reputation: 158
The first approach looks quite good. I would additionally treat acc
as an immutable array. Simply put do what you're doing without using push
or shift
. Here's how:
stream.scan((acc, current) => [...acc.slice(acc.length > 5 ? 1 : 0), current], []);
Upvotes: 1