Un1
Un1

Reputation: 4132

How do you push data to a readable stream from within a function?

I'm trying to achieve the following:

Code

const fs = require('fs')
const zlib = require('zlib')
const zip = zlib.createGzip()
const Stream = require('stream')


let wstream = fs.createWriteStream('C:/test/file.txt.gz') 
let readable = new Stream.Readable({
  objectMode: true,
  read(item) {
    this.push(item)
  }
})

readable.pipe(zip).pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir.sync(dir, {"max_depth": 0, "track_inodes": true}, (path, stat) => {
    readable.push(path)
    console.log('pushing a path to readable')
  }) 
}
getPaths("C:/")
console.log('getPaths() ran')
readable.push(null)  // indicates the end of the stream

Problem

The paths are not being compressed and written to the file as the getPaths function finds them and pushes them into the stream, it doesn't happen until it has found all of them. I know it's probably due to the process being synchronous but cannot figure out how to make it work.

I see the following output from the logs:

> // .gz file gets created with size of 0
> // Nothing happens for about 1 minute
> x(184206803) "pushing a path to readable"
> "getPaths() ran"
> // I see the data started being written into the file
> "Done"

UPDATE:

And if I do this asynchronously like this (or use the code from the answer below):

let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    readable.push(path)
  }) 
  walker.on('end', (path, stat) => {
    readable.push(null)
  }) 

  ...

  // readable.push(null) 

I get an error (I think, it throws that particular error when it doesn't receive expected data chunk after you're done pushing data into it. If you remove that last line from the code: readable.push(null), and try to run the code again it throws the same error):

TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be one of type
 string or Buffer. Received type number

Upvotes: 5

Views: 2514

Answers (1)

Avraham
Avraham

Reputation: 938

Your code is very good and works fine. You just need to remove this.push(item) and set read function with empty body.

Here is a working snippet

const fs = require('fs')
const zlib = require('zlib')
const zip = zlib.createGzip()
const Stream = require('stream')


let wstream = fs.createWriteStream('C:/test/file.txt.gz') 
let readable = new Stream.Readable({
  objectMode: true,
  read() { }
})

readable.pipe(zip).pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    readable.push(path)
  }) 
  walker.on('end', (path, stat) => {
    readable.push(null)
  }) 
}
getPaths("C:/")
console.log('getPaths() ran')

BTW, the right argument name is read(size). It represents the number of bytes to read

EDIT There is no need for the readable stream. You can write directly to zip.

const fs = require('fs');
const zlib = require('zlib');
const zip = zlib.createGzip();
const wstream = fs.createWriteStream('C:/test/file.txt.gz');

zip.pipe(wstream)
.on('finish', (err) => {
  console.log('done');
})

let walkdir = require('walkdir')
function getPaths(dir) {
  let walker = walkdir(dir, {"max_depth": 0, "track_inodes": true})
  walker.on('path', (path, stat) => {
    zip.write(path);
  })
  walker.on('end', (path, stat) => {
    zip.end();
  })
}
getPaths("C:/")
console.log('getPaths() ran')

Upvotes: 1

Related Questions