Alexander Mills
Alexander Mills

Reputation: 100320

Transform stream, accessing internal data

I want to read a file (ideally with fs.createReadStream, pipe it through a transform process and then write it (ideally using fs.createWriteStream) to another file.

I am using a transform stream (new stream.Transform()) for this, and it seems to work, except now I am stuck

in the flush method of the transform stream I have

 strm._flush = function (done) {
        if (this._lastLineData) {
            this.push(this._lastLineData + '\n');
        }
        this._lastLineData = null;
        done();
    };

I want to be able to modify some data in the internal data structure where the transform stores the data. How can I access that internal data structure.

In other words, this.push is surely pushing onto some array, I want to able to read from certain elements of that array in the _flush method.

Dropping into node core, I see this:

Transform.prototype.push = function(chunk, encoding) {
  this._transformState.needTransform = false;
  return Duplex.prototype.push.call(this, chunk, encoding);
};

and

Readable.prototype.push = function(chunk, encoding) {
  var state = this._readableState;

  if (!state.objectMode && typeof chunk === 'string') {
    encoding = encoding || state.defaultEncoding;
    if (encoding !== state.encoding) {
      chunk = new Buffer(chunk, encoding);
      encoding = '';
    }
  }

  return readableAddChunk(this, state, chunk, encoding, false);
};

and

function readableAddChunk(stream, state, chunk, encoding, addToFront) {
  var er = chunkInvalid(state, chunk);
  if (er) {
    stream.emit('error', er);
  } else if (chunk === null) {
    state.reading = false;
    onEofChunk(stream, state);
  } else if (state.objectMode || chunk && chunk.length > 0) {
    if (state.ended && !addToFront) {
      var e = new Error('stream.push() after EOF');
      stream.emit('error', e);
    } else if (state.endEmitted && addToFront) {
      var e = new Error('stream.unshift() after end event');
      stream.emit('error', e);
    } else {
      if (state.decoder && !addToFront && !encoding)
        chunk = state.decoder.write(chunk);

      if (!addToFront)
        state.reading = false;

      // if we want the data now, just emit it.
      if (state.flowing && state.length === 0 && !state.sync) {
        stream.emit('data', chunk);
        stream.read(0);
      } else {
        // update the buffer info.
        state.length += state.objectMode ? 1 : chunk.length;
        if (addToFront)
          state.buffer.unshift(chunk);
        else
          state.buffer.push(chunk);

        if (state.needReadable)
          emitReadable(stream);
      }

      maybeReadMore(stream, state);
    }
  } else if (!addToFront) {
    state.reading = false;
  }

  return needMoreData(state);
}

so it seems to me there is no easy way to get at the internal data?

Upvotes: 0

Views: 1129

Answers (1)

Ivan Drinchev
Ivan Drinchev

Reputation: 19591

You can't do that.

What streams should be used for is to pipe data from one place to another. If you want to modify the whole data ( before you pipe it to the next stream ) then you should not use a stream at all. Think of them as endless data stream.

What you can do is :

var data = [];

stream._transform = function(chunk, enc, done) { 
   data.push(chunk);
   done(chunk);
}

stream._flush = function(done) { 
   /** ... Do something with data ... */
   data = []; // Null data variable, to allow GC collecting it.
   done();
};

Upvotes: 1

Related Questions