Reputation: 1062
I'm trying to build an API in Nodejs that will stream the output of a query that is executed against a vertica DB.
The vertica db driver in Nodejs exposes unbuffered query interface that I'm using. For more details about that please see: https://github.com/wvanbergen/node-vertica
Following is my code:
var vertica = require('vertica');
var Readable = require('stream').Readable;
var rs = new Readable;
var conn = vertica.connect( {
host: 'hostname',
user: 'user',
password: 'password',
database: 'verticadb'
});
var q = conn.query('select * from table');
q.on('row', function(row) {
rs.push(row.join(',') + "\n");
});
q.on('end', function(status) {
rs.push(null);
rs.pipe(process.stdout);
conn.disconnect();
});
q.on('error', function(err) {
conn.disconnect();
});
It does return the appropriate output, but my understanding is that it actually buffers up the output of row.join(',') + "\n"
and only pipes it out to stdout once all rows are read. My objective is to pipe each row out as soon as it is read. How should I modify my code to make it work ? You can replace the vertica "row" event with anything comparable.
I've managed to make it work using what is called "classic readable streams" based on the documentation found on: https://github.com/substack/stream-handbook.
The code for this:
var vertica = require('vertica');
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var conn = vertica.connect( {
host: 'hostname',
user: 'user',
password: 'password',
database: 'verticadb'
});
var q = conn.query('select * from affiliate_manager_2');
q.on('row', function(row) {
stream.emit('data', row.join(',') + "\n");
});
q.on('end', function(status) {
stream.emit('end');
conn.disconnect();
});
q.on('error', function(err) {
conn.disconnect();
});
stream.pipe(process.stdout);
However it is the "old" way of doing it, and I would like to know how to do it using the "new way".
Upvotes: 1
Views: 2667
Reputation: 14696
Readable
is “abstract”. It’s looking for a function called _read
, which is not defined on the default implementation. Without it, it just buffers every push(chunk)
until it sees push(null)
. That’s the behavior you’re seeing in your example.
To get the behavior you want, just add a _read
function!
Here’s an example you can adapt to your database:
var Readable = require('stream').Readable;
var stream = new Readable;
stream._read = function () {
var query = …;
query.on('row', function (row) {
stream.push(JSON.stringify(row) + '\n');
});
query.on('end', function () {
stream.push(null);
});
stream._read = function () {};
};
stream.pipe(process.stdout);
Further reading:
Upvotes: 1