Reputation: 122432
I'm just starting learning about streams in node. I have a string in memory and I want to put it in a stream that applies a transformation and pipe it through to process.stdout
. Here is my attempt to do it:
var through = require('through');
var stream = through(function write(data) {
this.push(data.toUpperCase());
});
stream.push('asdf');
stream.pipe(process.stdout);
stream.end();
It does not work. When I run the script on the cli via node, nothing is sent to stdout and no errors are thrown. A few questions I have:
push
and queue
?end()
before or after calling pipe()
?end()
equivalent to push(null)
?Thanks!
Upvotes: 1
Views: 201
Reputation: 42325
First, you want to use write()
, not push()
. write()
puts data in to the stream, push()
pushes data out of the stream; you only use push()
when implementing your own Readable
, Duplex
, or Transform
streams.
Second, you'll only want to As @naomik pointed out, this isn't true in general since a write()
data to the stream after you've setup the pipe()
(or added some event listeners). If you write to a stream with nothing wired to the other end, the data you've written will be lost.Writable
stream will buffer write()
s. In your example you do need to write()
after pipe()
though. Otherwise, the process will end before writing anything to STDOUT
. This is possibly due to how the through
module is implemented, but I don't know that for sure.
So, with that in mind, you can make a couple simple changes to your example to get it to work:
var through = require('through');
var stream = through(function write(data) {
this.push(data.toUpperCase());
});
stream.pipe(process.stdout);
stream.write('asdf');
stream.end();
Now, for your questions:
write()
it, just like we're doing with stream.wrtie('asdf')
in your example.queue()
function, did you mean write()
? Like I said above, write()
is used to put data in to a stream, push()
is used to push data out of the stream. Only call push()
in your owns stream implementations.end()
after all your data has been written to your stream. end()
basically says: "Ok, I'm done now. Please finish what you're doing and close the stream."push(null)
is pretty much equivalent to end()
. That being said, don't call push(null)
unless you're doing it inside your own stream implementation (as stated above). It's almost always more appropriate to call end()
.Upvotes: 2
Reputation: 135207
Just use the vanilla stream API
var Transform = require("stream").Transform;
// create a new Transform stream
var stream = new Transform({
decodeStrings: false,
encoding: "ascii"
});
// implement the _transform method
stream._transform = function _transform(str, enc, done) {
this.push(str.toUpperCase() + "\n";
done();
};
// connect to stdout
stream.pipe(process.stdout);
// write some stuff to the stream
stream.write("hello!");
stream.write("world!");
// output
// HELLO!
// WORLD!
Or you can build your own stream constructor. This is really the way the stream API is intended to be used
var Transform = require("stream").Transform;
function MyStream() {
// call Transform constructor with `this` context
// {decodeStrings: false} keeps data as `string` type instead of `Buffer`
// {encoding: "ascii"} sets the encoding for our strings
Transform.call(this, {decodeStrings: false, encoding: "ascii"});
// our function to do "work"
function _transform(str, encoding, done) {
this.push(str.toUpperCase() + "\n");
done();
}
// export our function
this._transform = _transform;
}
// extend the Transform.prototype to your constructor
MyStream.prototype = Object.create(Transform.prototype, {
constructor: {
value: MyStream
}
});
Now use it like this
// instantiate
var a = new MyStream();
// pipe to a destination
a.pipe(process.stdout);
// write data
a.write("hello!");
a.write("world!");
Output
HELLO!
WORLD!
Some other notes about .push
vs .write
.
.write(str)
adds data to the writable buffer. It is meant to be called externally. If you think of a stream like a duplex file handle, it's just like fwrite
, only buffered.
.push(str)
adds data to the readable buffer. It is only intended to be called from within our stream.
.push(str)
can be called many times. Watch what happens if we change our function to
function _transform(str, encoding, done) {
this.push(str.toUpperCase());
this.push(str.toUpperCase());
this.push(str.toUpperCase() + "\n");
done();
}
Output
HELLO!HELLO!HELLO!
WORLD!WORLD!WORLD!
Upvotes: 2
Reputation: 35011
Based on the examples for stream (http://nodejs.org/api/stream.html#stream_readable_pipe_destination_options)
and through (https://www.npmjs.org/package/through)
it doesn't look like you are using your stream correctly... What happens if you use write(...) instead of push(...)?
Upvotes: 0