Sai
Sai

Reputation: 2032

Saving data stream into Cassandra using node.js

I have a data stream (via Node EventEmitter) emitting data in JSON format and would like to save the stream into Cassandra as it gets emitted. Is there an elegant way to implement this functionality?

The driver that i'm using is nodejs-dse-driver and the Cassandra version is 3.11.1. Please suggest if there are any recommended plugins that i can leverage to accomplish the above task.

Upvotes: 0

Views: 1153

Answers (2)

Dustin
Dustin

Reputation: 1026

This is a good use case for a Transform Stream.

If you have a true Readable stream then you can pipe any Transform stream into the Readable stream. I don't think an event emitter is a readable stream though, so you may need to change your original data fetching implementation.

See the NodeJS documentation for implementation details. https://nodejs.org/api/stream.html#stream_new_stream_transform_options

Something like this depending on your version of NodeJS.

const myTransformStream = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    // insert into Cassandra code here
    cassandra.execute(query, row, {prepare: true}, () => {
        // after the execute is done, callback to process more 
        callback(null, row);
    });
  }
});

originalStream.pipe(myTransformStream);

Upvotes: 1

jorgebg
jorgebg

Reputation: 6600

You can read the data in chunks from your source and send it in parallel, for example (using the async library):

const limit = 10;
stream.on('readable', () => {
  let r;
  const rows = [];
  async.whilst(function condition() {
    while ((r = csv.read()) != null && rows.length < limit) {
      rows.push(r);
    }
    return rows.length > 0;
  }, function eachGroup(next) {
    // we have a group of 10 rows or less to save
    // we can do it in a batch
    // or we can do it in parallel with async.each()
    async.each(rows, (r, eachCallback) {
      // Adapt the row to parameters
      // For example: sample
      const params = r.split(',);
      client.execute(query, params, { prepare: true}, eachCallback);
    }, next);
  }, function groupFinished(err) {
    if (err) {
      // something happened when saving
      // TODO: do something with err
      return;
    }
    // This chunk of rows emitted by stream where saved
  });
}).on('end', () => {
  // no more data from source
});

Upvotes: 0

Related Questions