Reputation: 5296
I have a bunch of files that I read, process and merge certain data from corresponding multiple stream into a single stream.
Is there a more elegant solution than below (having a separate counter, calling combinedStream.end()
after all source streams emit end
):
let combinedStream = ....;
let counter = 0;
filePaths.forEach(function(filePath) {
counter += 1;
const fileStream = fs.createReadStream(filePath);
const myStream = new MyStream(fileStream);
myStream.on('data', myStream.write);
myStream.on('end', function() {
counter -= 1;
if (counter === 0) {
combinedStream.end();
}
});
});
return combinedStream;
Upvotes: 1
Views: 1084
Reputation: 453
You can just process files with a Transform stream then pipe to a PassThrough Stream.
Since, you are using let
, I guess you can use ES2015.
"use strict";
let fs=require('fs');
let filePaths=['./tmp/h.txt','./tmp/s.txt'];
let Stream = require('stream');
class StreamProcessor {
constructor() {
this.process_streams = [];
}
push (source_stream) {
// Create a new Transform Stream
let transform = new StreamTransformer();
// Register the finish event and pipe
transform.processed = transform.wait.call(transform);
source_stream.pipe(transform);
// push the stream to the internal array
this.process_streams.push(transform);
}
done (callback) {
let streams = this.process_streams;
// Wait for all Transform streams to finish processing
Promise.all(
streams.map(function(s) {return s.processed; })
)
.then ( function() {
let combined_stream=new Stream.PassThrough();
streams.forEach(function (stream) {
stream.pipe(combined_stream);
});
// Call the callback with stream
callback(null,combined_stream);
})
.catch(function (err) {
callback(err);
});
}
}
class StreamTransformer extends Stream.Transform {
constructor () {
// call super
super();
}
_transform(chunk,enc, transformed) {
// process files here
let data=chunk.toString();
data=data.substring(0,data.length-2);
this.push(data);
transformed();
}
_flush(flushed) {
// for additonal at end
this.push('\n');
flushed();
}
wait() {
// returns a promise that resolves, when all the data is processed;
let stream = this;
return new Promise(function(resolve,reject) {
stream.on('finish', function() {
resolve(true); });
stream.on('error', function(err) {
reject(err);
});
});
}
}
/// Now you can do..
let process_stream = new StreamProcessor();
filePaths.forEach(function (fpath) {
let fstream = fs.createReadStream(fpath);
process_stream.push(fstream);
});
process_stream.done( function
(err,combined_stream) {
// Consume the combines stream
combined_stream.pipe(process.stdout);
});
Test files contains 'hello' and 'stream'
// Outputs is
// hell
// stream
This can be improved further.. . :/
Upvotes: 0
Reputation: 50540
A cleaner approach could be the one used in that repo, even though it does nothing more than hiding your counter somewhere and let you deal with a more comfortable callbacks based model.
This way, your code will look like:
let sharedStream = ...
function onEachFilename(filename, callback) {
// here you can read from the stream and push the data on the shared one,
// then invoke the "internal" callback on the end event
}
function onEndAll() {
// here you can finalize and close the shared stream
}
forEach(filenames, onEachFilename, onEndAll);
Keep in mind that somewhere there is still a function that is in charge to count for you and invoke the onEnd
function once all the callback
functions have been invoked.
Upvotes: 1