ijjo
ijjo

Reputation: 525

How can I divide a RXJS Observable stream into equal N sized arrays?

Let's say I have an Observable stream created from an array like so:

const items: Items = [{},{},{},{},{},...];
const obs$ = from(items).pipe(
    mergeMap(items => {
        return this.getData(items);
    })
);

As this code stands, getData() will be called for each item in the array. What I want to do is divide the items array into N equal sized arrays, then have those arrays emitted instead. Then getData() will be called with the individual arrays rather than each item in the original items array.

Basically I need a variation of the toArray() operator but on only portions of the stream that I pre-define.

Upvotes: 3

Views: 155

Answers (1)

Goga Koreli
Goga Koreli

Reputation: 2947

You need buffer operator which has several variations, which are: buffer, bufferCount, bufferTime, bufferToggle, bufferWhen. Buffer Official Documentation

For example in your case dividing array in 3 size arrays would be:

const items: Items = [{},{},{},{},{},...];
const obs$ = from(items).pipe(
    bufferCount(3),
    mergeMap(items => {
        return this.getData(items);
    })
);

Upvotes: 10

Related Questions