Reputation: 465
I am trying to replace an ETL process using NodeJS streams. The Transform stream I am attempting to write takes in a dataset, and based on configuration data, will output one or more records per record inputted. In other words, if it's reading 100000 records, the transformation can end up writing anywhere from 100000-400000 records. The _transform
method only allows its callback to be called once, so I am trying to figure out how to output multiple objects per single input object.
I looked at duplexes but every example I saw was using it as a two way flow, whereas I definitely want my stream to be one way (or i may just not understand how they work). Anyone have any suggestions on how to implement this?
Upvotes: 3
Views: 2794
Reputation: 86
NodeJS streams are GREAT for ETL work, but while very powerful they're also pretty complex and it's easy to get lost when when you're starting from scratch--as you have already experienced. I was looking for an easy way to use streams them while also allowing for code reuse; I ended up creating gulp-etl, which uses streams under the hood. Using gulp-etl, you could use the gulp-etl-handlelines plugin, which calls a callback for each incoming record; if you want a single incoming record to produce multiple records it would look something like this:
const handleLines = require('gulp-etl-handlelines').handlelines;
const linehandler = (lineObj, context) => {
let recsToReturn = [];
// return null to remove this line
if (!lineObj.record || lineObj.record["TestValue"] == 'illegalValue')
{return null}
// return incoming record
recsToReturn.push(lineObj);
// logic to create new record
if (lineObj.record.needsDuplication) {
// clone newRec from lineObj
let newRec = {...lineObj, record:...lineObj.record};
// change new record as needed
newRec.record.UniqueField = "newValue";
recsToReturn.push(newRec);
}
// return the record(s)
return recsToReturn;
}
exports.default = function() {
return src('data/100kRecs.ndjson', { buffer:false /* use streaming mode */})
// pipe the files through our handlelines plugin
.pipe(handlelines({}, { transformCallback: linehandler }))
.pipe(dest('output/'));
}
We're using Transform streams behind the scenes, but that's all abstracted away; you get the benefits without having to get into the weeds of streams implementation--unless you want to write your own plugins. Plus, you can use lots of existing plugins.
Upvotes: 0
Reputation: 826
The callback can only be called once, but the .push
method is what emits data, and can be called as many times as necessary in the _transform
method. Example:
class MyTransform extends Transform {
_transform(chunk, enc, next) {
const arrayFromChunk = chunk.split(',');
arrayFromChunk.forEach(piece => {
// this.push is what will emit readable data, can be called as often
// as needed.
this.push(piece);
});
next(); // next can only be called once.
}
}
docs here: https://nodejs.org/docs/latest-v18.x/api/stream.html#stream_implementing_a_transform_stream
Upvotes: 7