nicholaswmin
nicholaswmin

Reputation: 22989

Pausing a readable stream in Node.js

I'm using csv-to-json, a neat library to process CSV files.

I've got a use case where I need to process a large (>2 million rows) CSV and insert it into a DB.

To do this without running into memory issues, I intend to process the CSV as a stream, pausing the stream every 10000 rows, inserting the rows in my DB and then resuming the stream.

For some reason I can't seem to pause the stream.

Take for example the following code:

const rs = fs.createReadStream("./foo.csv");
rs.pause();

let count = 0;

csv()
.fromStream(rs)
.on("json", (json) => {
  count++;
  console.log(count);
})
.on("done", () => {
  cb(null, count);
})
.on("error", (err) => {
  cb(err);
})

count is logged 200 times (that's how many rows I have in my CSV) - I was expecting it not to log anything since the stream is paused before passing it over to fromStream()

Upvotes: 10

Views: 16644

Answers (3)

Nishant Desai
Nishant Desai

Reputation: 1722

I have leveraged the fact that csvtojson also has fromString(...) method, and used it as below.

  1. Use line-by-line package to read fixed number of lines i.e. 10000 and store them in an array.
  2. pause line-by-line reader using lr.pause().
  3. insert headers line (if your csv file has header line then use a simple conditional statement to ignore first line returned by line-by-line reader) at index 0.
  4. join all lines with EOL character which will give you string representation of 10000 lines of that CSV file.
  5. use csvtojson's .fromString(...) to convert the string representation of block into json objects and insert them into db.
  6. resume the stream via lr.resume() and repeat until line-by-line reader emits 'end' event.

Here's complete code

const CSVToJSON = require("csvtojson");
const LineByLineReader = require("line-by-line");
const { EOL } = require("os");

const BLOCK_LIMIT = 10000;

let lines = [];
let isFirstLineProcessed = false;

const lr = new LineByLineReader("./foo.csv");

lr
.on("line", (line) => {

    // remove this if statement if your CSV does not contain headers line
    if (!isFirstLineProcessed) {
        isFirstLineProcessed = true;
        return;
    }

    lines.push(line);

    if (lines.length === BLOCK_LIMIT) {
        lr.pause();

        // insert headers string ("field1, field2, ...") at index 0;
        lines.splice(0, 0, headers);

        // join all lines using newline operator ("\n") to form a valid csv string
        const csvBlockString = lines.join(EOL);
        const entries = [];

        lines = [];      

        csv()
            .fromString(csvBlockString)
            .on("json", (json) => {
                entries.push(json);
            })
            .on("done", () => {
                this._insertEntries(db, entries, ()=> {
                    lr.resume();
               });
            });
    }
})
.on("end", () => {
    console.log("done");
});

Upvotes: 4

nicholaswmin
nicholaswmin

Reputation: 22989

Here's a solution suggested by the creator of the library, tracked in this Issue:

var tmpArr=[];
rs.pipe(csv({},{objectMode:true})).pipe(new Writable({
  write: function(json, encoding,callback){
    tmpArr.push(json);
    if (tmpArr.length===10000){
      myDb.save(tmpArr,function(){
        tmpArr=[];
        callback();
      })
    }else{
      callback();
    }
  } ,
  objectMode:true
}))
.on('finish',function(){
  if (tmpArr.length>0){
    myDb.save(tmpArr,function(){
      tmpArr=[];
    })
  }
})

I've actually managed to emulate pausing by unpiping like so, but it's not ideal:

let count = 0;
var csvParser=csv()
.fromStream(rs)
.on("json", (json) => {
  rows.push(json);
  if (rows.length % 1000 === 0) {
    rs.unpipe();
    // clear `rows` right after `unpipe`
    const entries = rows;
    rows = [];
    this._insertEntries(db, entries, ()=> {
      rs.pipe(csvParser);
    });
  }
})

Upvotes: 7

manikawnth
manikawnth

Reputation: 3249

You cannot do it unless you modify the csv2json library.

This is the link you should read first
https://nodejs.org/dist/latest-v6.x/docs/api/stream.html#stream_three_states

The stream is in paused mode when you did rs.pause(). Infact even if you don't do it the readable stream starts in paused mode.

The stream goes into resume under 3 scenarios.

  • Either there's an .on('data') event listener or
  • there is a .pipe() method attached or
  • readable.resume() is called explicitly.

In your case the fromStream() method has the pipe method attached to your readable stream which thus resumed the stream.

Reference code:
https://github.com/Keyang/node-csvtojson/blob/master/libs/core/Converter.js#L378

Converter.prototype.fromStream=function(readStream,cb){
  if (cb && typeof cb ==="function"){
    this.wrapCallback(cb);
  }
  process.nextTick(function(){
    readStream.pipe(this);
  }.bind(this))
  return this;
}

Upvotes: 3

Related Questions