hielsnoppe
hielsnoppe

Reputation: 2869

How to chunk a stream of objects?

I have a stream of objects (objectMode: true) and want to chunk it into arrays of equal size, so that I can pipe them to another function that accepts arrays. I found the following modules that seem to accomplish that for buffers, but not for objects:

Do you know of a module that can do this for object streams or is there an obvious simple DIY solution?

Upvotes: 3

Views: 1823

Answers (1)

hielsnoppe
hielsnoppe

Reputation: 2869

I found the following to work:

function ItemCollector (chunkSize) {

  this._buffer = [];
  this._chunkSize = chunkSize;

  stream.Transform.call(this, { objectMode: true });
}

util.inherits(ItemCollector, stream.Transform);

ItemCollector.prototype._transform = function (chunk, encoding, done) {

  if (this._buffer.length == this._chunkSize) {

    this.push(this._buffer);
    this._buffer = [];
  }

  this._buffer.push(chunk);

  done();
};

ItemCollector.prototype._flush = function () {

  this.push(this._buffer);
};

Use like this:

objectStream.pipe(new ItemCollector(10)).pipe(otherFunction)

Upvotes: 3

Related Questions