arb
arb

Reputation: 7863

Writing to a WriteStream from Event Handler

I have an EventEmitter object that I am set up to listen to events. When the event is emitted, I want to write information to a file. I have an open FileStream via fs.createWriteStream(path, { flags: 'a'}); Currently, my problem is that if I emit events super fast and often, I start to get "backed up". IE .write returns false asking me to stop writing for a moment. Since I'm doing the write in an event handler, there isn't a callback function nearby to use to indicate the end of the writing process. What can I do either from the handling or emitting side to prevent the backup?

Ultimately, it doesn't seem to matter; all of the data does get written to the file. But I'd like to follow "the rules" as best I can.

I know I can listen for the drain event and start writing again after that, but how can I prevent other events from coming into the handler? I noticed that if I put like a 50ms delay before each emit, the backup doesn't seem to happen but that seems kind of like a hack. Plus what if you have a slower HDD?

Below is an example of my situation:

var ee = new EventEmitter();
var stream = fs.createWriteStream('./file/log.txt', { flags:'a'} );

ee.on('report', function (i) {
  stream.write('new file data ' + i + ' --- '  + Date.now + '\n');
});

for (var i = 0; i < 10000; ++i) {
  ee.emit('report', i)
}

This isn't the exact code, but this is the gist of it. The full code happens when a response is sent from a running HTTP server, but if I queue up like 1000 requests, via a for loop for example, I get into the above situation.

Upvotes: 3

Views: 1490

Answers (2)

arb
arb

Reputation: 7863

I actually ended up finding a much simpler solution to this problem using a read and write stream. See the code below for an example

var stream = require('stream');
var fs = require('fs');
var EventEmitter = require('events').EventEmitter;

var ee = new EventEmitter();
var writeStream = fs.createWriteStream('./file/log.txt', { flags: 'a', end: false } );
var readStream = new stream.Readable();
// This needs to be here for compatibility reasons, but is intentionally a no-op
readStream._read = function() {};

ee.on('report', function (i) {
  readStream.push(i.toString());
});

readStream.pipe(writeStream);

for (var i = 0; i < 10000; ++i) {
  ee.emit('report', i);
}

This will allow the Node pipe and stream system handle the back pressure in coordination with the OS. This is the preferred approach to this problem IMO.

Upvotes: 1

Mike S
Mike S

Reputation: 42335

The ideal way to handle this is to pause() the incoming events, which you can do if the events are coming from a stream or are otherwise pause-able somehow, but that's not always possible.

If you can't pause the incoming events somehow, then the way I usually handle this is by using the queue function of the async module. There are certainly plenty of other ways to do this, but using a queue is the easiest way I've found and the async module (which is great for lots of async operations) provides a good one.

The basic idea is to put all your write calls in to a queue which is configured to only process 1 task at a time. If you get false back from your stream.write call, then you pause() the queue. Once you get a drain event from your stream, you resume() the queue again. That way you're not writing to the stream while it's saturated, but you're still able to receive events and queue them up for when the stream is ready for them.

Doing that with your example code would look something like this:

var async = require('async');

var ee = new EventEmitter();
var stream = fs.createWriteStream('./file/log.txt', { flags:'a'} );

// Create a queue with a concurrency of 1
var writeQueue = async.queue(function(data, callback) {
    if (!stream.write(data)) {
        // if write() returns false, it's saturated; pause the queue
        writeQueue.pause();
    }
    callback();
}, 1); // <-- concurrency argument here; it's easy to miss ;)

stream.on('drain', function() {
    // the stream isn't saturated anymore; resume the queue
    writeQueue.resume();
})

ee.on('report', function (i) {
    // instead of writing directly to the stream, push data to the writeQueue
    writeQueue.push('new file data ' + i + ' --- '  + Date.now() + '\n');
});

for (var i = 0; i < 10000; ++i) {
  ee.emit('report', i)
}

Note: This isn't really that different from just letting the stream buffer things internally. You're still buffering the data, you're just doing it yourself and that gives you more control over the situation.

Upvotes: 0

Related Questions