Jakub Synowiec
Jakub Synowiec

Reputation: 5989

RxJS - Buffer up to n items and emit those buffers

I'm trying to buffer up to n (assume 5) items from a stream but emit partially filled arrays until the buffer is full.

Assuming that I have a stream like:

const stream = Rx.Observable.range(0, 6);

I want to emit:

[0]
[0, 1]
[0, 1, 2]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
[1, 2, 3, 4, 5]

I have two solutions so far and I'm wondering which one is more "Rx way".

Scan and shift the accumulator if there are more than n elements:

stream.scan((acc, current) => {
  acc.push(current);

  if (acc.length > 5) {
    acc.shift();
  }

  return acc;
}, []);

Or scan the stream while there are no more than 5 elements and merge with buffer:

stream.scan((acc, current) => {
  acc.push(current);
  return acc;
}, [])
.takeWhile((x) => { return x.length < 5 })
.merge(stream.bufferWithCount(5));

Which approach is better and more consistent with Rx? Performance-wise shifting the array is, give or take 60% faster than merging according to a quick test at jsperf.

Or maybe there is a better solution?

Upvotes: 2

Views: 867

Answers (1)

Rafael Kallis
Rafael Kallis

Reputation: 158

The first approach looks quite good. I would additionally treat acc as an immutable array. Simply put do what you're doing without using push or shift. Here's how:

stream.scan((acc, current) => [...acc.slice(acc.length > 5 ? 1 : 0), current], []);

Upvotes: 1

Related Questions