prototype
prototype

Reputation: 7970

how to multiply and skip rows piping streams using Node-CSV

I'd like to use Node CSV (https://csv.js.org/transform) to transform an CSV input stream of N wide rows with many columns into a CSV output stream of (m • N) narrower rows with fewer columns.

The input and output files are large so I can't accumulate it in memory within practical limits, but streaming pipes should work.

But I cant figure out how to call CSV.transform. In the example below for each original row it returns an array of m rows, but this fails as CSV.stringify( ) seems to be trying interpret each array of m rows as a single row object itself.

The official example (https://csv.js.org/transform/examples/) and other examples I can find (e.g. https://stackoverflow.com/a/25159167/645715) each either accumulate the entire array in memory or returns one row object per row.

        const CSV = require('csv')
        const inputStream = fs.createReadStream(INPUT_FILE)
        const outputStream = fs.createWriteStream(OUTPUT_FILE)
        inputStream
            .pipe(CSV.parse({columns: true}))
            .pipe(CSV.transform(function(row, callback) {
              var substack = []
              // turn 1 wide row into an array of m narrower rows 
              // for (i=1 ... m) substack.push({...})
              return callback( null, substack) // this doesn't work
             })        
            .pipe(CSV.stringify({header: true}))           
            .pipe(outputStream)
            .on('error', reject)
            .on('end', resolve)

Upvotes: 0

Views: 940

Answers (2)

Ravi MCA
Ravi MCA

Reputation: 2621

I've used as below

const filterTransform = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    const row = JSON.parse(chunk.toString());
    
    // Ignore a row based on condition
    if (row.id !== 'xxx') {
      this.push(row);
    }
    
    callback();
  },
});

Upvotes: 1

prototype
prototype

Reputation: 7970

Aha, the trick is to emit data for each subrow

            .pipe(CSV.transform(function(row, callback) {
              count++
              for (var i=0; i<m; i++) {
                let new_row = {count, i, ...} // create new row
                this.emit('data',new_row) // emit new row
              }
              callback()
            }) , {})

Upvotes: 0

Related Questions