Reputation: 523
I'm currently processing a file stream line by line by running it through a transform stream that emits 'line'
events. I would like to be able to, upon finding that the current line matches some criteria, pause the input file stream, start processing a new stream, and when that is finished, resume processing the original stream line by line. I've condensed it down to a minimal example below:
test.coffee:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.resume()
else
process.stdout.write line
test-transform.coffee:
Transform = require('stream').Transform
module.exports =
class TestTransform extends Transform
constructor: ->
Transform.call @, readableObjectMode: true
@buffer = ""
pushLines: ->
newlineIndex = @buffer.indexOf "\n"
while newlineIndex isnt -1
@push @buffer.substr(0, newlineIndex + 1)
@emit 'line', @buffer.substr(0, newlineIndex + 1)
@buffer = @buffer.substr(newlineIndex + 1)
newlineIndex = @buffer.indexOf "\n"
_transform: (chunk, enc, cb) ->
@buffer = @buffer + chunk.toString()
@pushLines()
cb?()
_flush: (cb) ->
@pushLines()
@buffer += "\n" # ending newline
@push @buffer
@emit 'line', @buffer # push last line
@buffer = ""
cb?()
(Don't worry about the Transform stream too much, it's just an example.) Anyway, the output of coffee test.coffee
looks like:
-->fs = require 'fs'
-->
-->TestTransform = require './test-transform'
-->
-->inStream = new TestTransform
-->
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->
-->inStream.on 'line', (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
PAUSE
--> process.stdout.write line
--> console.error "PAUSE"
--> inStream.pause()
--> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
--> console.error "UNPAUSE"
--> inStream.unpause()
--> else
--> process.stdout.write line
-->
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.unpause()
else
process.stdout.write line
So obviously, the pipe isn't getting paused, it's just continuing until finished (even though the PAUSE
is getting run just as expected), and since "UNPAUSE"
is never getting written out either, the 'end'
callback is never firing. Switching the stream to pause/unpause to the readStream from the transform stream doesn't seem to work either. I'm assuming from this behavior that node streams somehow don't respect pause/unpause from within an event callback.
There may also be another way to accomplish this without calling pause/unpause; if there's some way to like await the end of a stream and pause the current thread of execution, that'd effectively do what I'm trying to do.
Upvotes: 4
Views: 5185
Reputation: 17434
If I've understood the question correctly, here's a simple Node app using Dust.js that solves the problem.
Dust is a templating engine, but one of its best features is its native understanding of Node Streams. This example uses Dust 2.7.0.
I'm using node-byline
as a replacement for your Transform stream, but it does the same thing-- reads streams by line.
var fs = require('fs'),
byline = require('byline'),
dust = require('dustjs-linkedin');
var stream = byline(fs.createReadStream('./test.txt', { encoding: 'utf8' }));
var template = dust.loadSource(dust.compile('{#byline}--> {.|s}{~n}{match}{/byline}'));
dust.stream(template, {
byline: stream,
match: function(chunk, context) {
var currentLine = context.current();
if(currentLine.match(/line\.match/g)) {
return fs.createReadStream('./test.txt', 'utf8');
}
return chunk;
}
}).pipe(process.stdout);
Here's the output from my program:
$ node index.js
--> fs = require 'fs'
--> TestTransform = require './test-transform'
--> inStream = new TestTransform
--> fs.createReadStream("./test.coffee").pipe(inStream)
--> inStream.on 'line', (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'line', (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.pause()
fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
console.error "UNPAUSE"
inStream.resume()
else
process.stdout.write line
--> process.stdout.write line
--> console.error "PAUSE"
--> inStream.pause()
--> fs.createReadStream("./test.coffee").pipe(process.stdout).on 'end', ->
--> console.error "UNPAUSE"
--> inStream.resume()
--> else
--> process.stdout.write line
As you can see, it's properly interleaved the output. If I can elaborate further on how the Dust part works, let me know.
EDIT: Here's an explanation of the Dust template specifically.
{#byline} {! look for the context variable named `byline` !}
{! okay, it's a stream. For each `data` event, output this stuff once !}
-->
{.|s} {! output the current `data`. Use |s to turn off HTML escaping !}
{~n} {! a newline !}
{match} {! look up the variable called `match` !}
{! okay, it's a function. Run it and insert the result !}
{! if the result is a stream, stream it in. !}
{/byline} {! done looping !}
Upvotes: 2
Reputation: 523
I actually found a separate answer to this too; not as pretty, but also works.
Essentially, pause()
only pauses output from a piped stream (in "flowing" mode); since I was listening to the 'line'
event, it wasn't flowing, and so pause
of course did nothing. So the first solution was to use removeListener
instead of pause
, which does effectively stop the streaming. The file now looks like:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.removeListener 'line', c
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
console.error "UNPAUSE"
inStream.on 'line', c
f.pipe(process.stdout)
else
process.stdout.write line
inStream.on 'line', c
And this produces output that almost works:
-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->c = (line) ->
--> process.stdout.write "-->"
--> if line.match /line\.match/g
PAUSE
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
c = (line) ->
process.stdout.write "-->"
if line.match /line\.match/g
process.stdout.write line
console.error "PAUSE"
inStream.removeListener 'line', c
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
console.error "UNPAUSE"
inStream.on 'line', c
f.pipe(process.stdout)
else
process.stdout.write line
inStream.on 'line', c
UNPAUSE
However, it looks like the original readable stream just stopped when I removed the listener; this makes some twisted sort of sense (I guess node garbage collects its readable streams when all listeners have been removed). So the final working solution that I found relies instead on piping. Since the Transform stream I showed above also pushes its output by line to any 'data'
listeners, pause()
can be effectively used here for its original objective, without just killing the stream. The final output:
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
line = chunk.toString()
process.stdout.write "-->#{line}"
if line.match /line\.match/g
inStream.pause()
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
inStream.resume()
f.pipe(process.stdout)
with output:
-->fs = require 'fs'
-->TestTransform = require './test-transform'
-->inStream = new TestTransform
-->fs.createReadStream("./test.coffee").pipe(inStream)
-->inStream.on 'data', (chunk) ->
--> line = chunk.toString()
--> process.stdout.write "-->#{line}"
--> if line.match /line\.match/g
fs = require 'fs'
TestTransform = require './test-transform'
inStream = new TestTransform
fs.createReadStream("./test.coffee").pipe(inStream)
inStream.on 'data', (chunk) ->
line = chunk.toString()
process.stdout.write "-->#{line}"
if line.match /line\.match/g
inStream.pause()
f = fs.createReadStream("./test.coffee")
f.on 'end', ->
inStream.resume()
f.pipe(process.stdout)
--> inStream.pause()
--> f = fs.createReadStream("./test.coffee")
--> f.on 'end', ->
--> inStream.resume()
--> f.pipe(process.stdout)
-->
which was the intended result.
Upvotes: 1