Reputation: 2032
I have a data stream (via Node EventEmitter) emitting data in JSON format and would like to save the stream into Cassandra as it gets emitted. Is there an elegant way to implement this functionality?
The driver that i'm using is nodejs-dse-driver and the Cassandra version is 3.11.1. Please suggest if there are any recommended plugins that i can leverage to accomplish the above task.
Upvotes: 0
Views: 1153
Reputation: 1026
This is a good use case for a Transform Stream.
If you have a true Readable stream then you can pipe any Transform stream into the Readable stream. I don't think an event emitter is a readable stream though, so you may need to change your original data fetching implementation.
See the NodeJS documentation for implementation details. https://nodejs.org/api/stream.html#stream_new_stream_transform_options
Something like this depending on your version of NodeJS.
const myTransformStream = new Transform({
objectMode: true,
transform(row, encoding, callback) {
// insert into Cassandra code here
cassandra.execute(query, row, {prepare: true}, () => {
// after the execute is done, callback to process more
callback(null, row);
});
}
});
originalStream.pipe(myTransformStream);
Upvotes: 1
Reputation: 6600
You can read the data in chunks from your source and send it in parallel, for example (using the async library):
const limit = 10;
stream.on('readable', () => {
let r;
const rows = [];
async.whilst(function condition() {
while ((r = csv.read()) != null && rows.length < limit) {
rows.push(r);
}
return rows.length > 0;
}, function eachGroup(next) {
// we have a group of 10 rows or less to save
// we can do it in a batch
// or we can do it in parallel with async.each()
async.each(rows, (r, eachCallback) {
// Adapt the row to parameters
// For example: sample
const params = r.split(',);
client.execute(query, params, { prepare: true}, eachCallback);
}, next);
}, function groupFinished(err) {
if (err) {
// something happened when saving
// TODO: do something with err
return;
}
// This chunk of rows emitted by stream where saved
});
}).on('end', () => {
// no more data from source
});
Upvotes: 0