StephenK
StephenK

Reputation: 33

Node - Readable stream pipe() overwrite previous streams in a for loop

I am trying to stream a data collection to multiple files with the code below:

for (var key in data) {
  // skip if collection length is 0
  if (data[key].length > 0) {
    // Use the key and jobId to open file for appending
    let filePath = folderPath + '/' + key + '_' + jobId + '.txt';

    // Using stream to append the data output to file, which should perform better when file gets big
    let rs = new Readable();
    let n = data[key].length;
    let i = 0;

    rs._read = function () {
      rs.push(data[key][i++]);

      if (i === n) {
        rs.push(null);
      }
    };

    rs.pipe(fs.createWriteStream(filePath, {flags: 'a', encoding: 'utf-8'}));

  }
}

However, I end up getting all files being populated with the same data, which is the array for the last key in data object. It seems the reader stream is overridden for each loop, and the pipe() to writable stream doesn't start until the for loop is finished. How is that possible?

Upvotes: 3

Views: 2088

Answers (2)

Damaged Organic
Damaged Organic

Reputation: 8467

This is happening because the key you're defining in the loop statement is not block-scoped. This is not a problem at first, but when you create a closure on it inside the rs._read function, all subsequent stream reads are using the last known value, which is the last value of data array.

And while we at it, I can propose a bit of a refactoring to make the code cleaner and more reusable:

const writeStream = (folderPath, index, jobId) => {
    const filePath = `${folderPath}/${index}_${jobId}.txt`;

    return fs.createWriteStream(filePath, {
        flags: 'a', encoding: 'utf-8'
    });
}

data.forEach((value, index) => {
    const length = value.length;

    if (length > 0) {
        const rs = new Readable();
        const n = length;

        let i = 0;

        rs._read = () => {
            rs.push(value[i++]);
            if (i === n) rs.push(null);
        }

        rs.pipe(writeStream(folderPath, index, jobId));
    }
});

Upvotes: 0

Kieper
Kieper

Reputation: 315

So the reason why you code is probably not working is because rs._read method is called asynchronically, and your key variable is function scoped(because of var keyword).

Every rs stream that you create points to the same variable which is key, at the end of main loop, each of those callbacks will have the same value. When you change "var" to "let", then in each iteration new key variable will be created and it will solve your problem(_read function will have its own copy of key variable instead of shared one).

If you change it to let it should work.

Upvotes: 2

Related Questions