AFMeirelles
AFMeirelles

Reputation: 419

Read and process smaller chunks inside a duplex stream in node

I need to take a sequence of a 20Mb json objects array, pipe into a stream, break into smaller arrays of 33 items, convert it to html and then pipe into another stream (for pdf conversion).

The thing is, I haven't quite understood how node streams work. I'm trying to solve it using a Duplex stream, but I don't know how to pool the incoming chunks from the upper stream and send them in parts to the down stream. In this code

jsonReader = fs.createReadStream 'source.json'

class Convert extends Duplex

    constructor: ->
        super readableObjectMode: true
        # Duplex.call @, readableObjectMode: true
        @buffer = []

    _read: (lines) ->
        console.log "buffer #{@buffer.length}"
        if @buffer.length is 0
            @push null
        else 
            console.log "lines: #{lines}"
            page = @buffer.slice 0, 33
            console.log page.length
            @buffer.splice 0, 33
            @push page

    _write: (data, enconding, next) ->
        @buffer.push data
        next()

convert = new Convert()

jsonReader.pipe(convert).pipe(process.stdout)

@buffer is always empty. Where does node store the chunks incoming from the upper streams?

Upvotes: 1

Views: 763

Answers (1)

Shanoor
Shanoor

Reputation: 13682

The data you're receiving in _write is a buffer, a binary part of the input file, you don't receive objects or even strings. Either you manually parse the chunks to retrieve your objects or you can load your entire file in memory (20Mb is not that big) and parse it. Here's an example (I use event-stream for easy streams manipulation/creat):

es = require('event-stream')

convert = (path) ->
    # load and parse your file
    content = JSON.parse fs.readFileSync(path, 'utf8')

    es.readable (count, next) ->
        # emit 33 elements at a time until content.length === 0
        while content.length
            this.emit 'data', content.splice(0, 33)

        # close the stream
        this.emit 'end'
        next()

srcPath = __dirname + '/source.json'
# convert is a stream
convert(srcPath)
    # piping to console.log because process.stdout can't handle objects
    .pipe(es.map (arr, next) ->
        console.log arr
        next()
    )

Upvotes: 0

Related Questions