Tomasz Rakowski
Tomasz Rakowski

Reputation: 1062

Using streams in NodeJS

I'm trying to build an API in Nodejs that will stream the output of a query that is executed against a vertica DB.

The vertica db driver in Nodejs exposes unbuffered query interface that I'm using. For more details about that please see: https://github.com/wvanbergen/node-vertica

Following is my code:

var vertica = require('vertica');
var Readable = require('stream').Readable;
var rs = new Readable;

var conn = vertica.connect( {
    host: 'hostname',
    user: 'user',
    password: 'password',
    database: 'verticadb'    
});


var q = conn.query('select * from table');

q.on('row', function(row) {
    rs.push(row.join(',') + "\n");
});

q.on('end', function(status) {
    rs.push(null);
    rs.pipe(process.stdout);
    conn.disconnect();
});

q.on('error', function(err) {
    conn.disconnect();
});

It does return the appropriate output, but my understanding is that it actually buffers up the output of row.join(',') + "\n" and only pipes it out to stdout once all rows are read. My objective is to pipe each row out as soon as it is read. How should I modify my code to make it work ? You can replace the vertica "row" event with anything comparable.

Addendum

I've managed to make it work using what is called "classic readable streams" based on the documentation found on: https://github.com/substack/stream-handbook.

The code for this:

var vertica = require('vertica');
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;

var conn = vertica.connect( {
    host: 'hostname',
    user: 'user',
    password: 'password',
    database: 'verticadb'
});


var q = conn.query('select * from affiliate_manager_2');

q.on('row', function(row) {
    stream.emit('data', row.join(',') + "\n");
});

q.on('end', function(status) {
    stream.emit('end'); 
    conn.disconnect();
});

q.on('error', function(err) {
    conn.disconnect();
});

stream.pipe(process.stdout);

However it is the "old" way of doing it, and I would like to know how to do it using the "new way".

Upvotes: 1

Views: 2667

Answers (1)

Todd Yandell
Todd Yandell

Reputation: 14696

Readable is “abstract”. It’s looking for a function called _read, which is not defined on the default implementation. Without it, it just buffers every push(chunk) until it sees push(null). That’s the behavior you’re seeing in your example.

To get the behavior you want, just add a _read function!

Here’s an example you can adapt to your database:

var Readable = require('stream').Readable;

var stream = new Readable;

stream._read = function () {
  var query = …;
  query.on('row', function (row) {
    stream.push(JSON.stringify(row) + '\n');
  });
  query.on('end', function () {
    stream.push(null);
  });
  stream._read = function () {};
};

stream.pipe(process.stdout);

Further reading:

Upvotes: 1

Related Questions