Un1
Un1

Reputation: 4132

Writing large amount of strings from memory to a file using streams in node.js causing high RAM usage

Problem

I'm trying to write millions of strings into a file using Node.js streams, but the RAM usage goes up to 800MB during the process:

const fs = require('fs')
const walkdir = require('walkdir')

let options = {
  "max_depth": 0,
  "track_inodes": true
}

let dir = "C:/"
let paths = walkdir(dir, options)
var wstream = fs.createWriteStream('C:/test/file.txt')
wstream.write('[')

paths.on('path', function(path, stat) {
  wstream.write(`"${path}",`)
})

paths.on('end', function(path, stat) {
  wstream.write(']')
  wstream.end()

  // Compressing the file after it's written:
  const gzip = require('zlib').createGzip()
  const inp = fs.createReadStream('C:/test/file.txt')
  const out = fs.createWriteStream('C:/test/file.txt.gz')
  inp.pipe(gzip).pipe(out)
})

I also tried writing the file like this:

...
paths.on('path', function(path, stat) {
  fs.writeFileSync('C:/test/file.txt', path)
})
...

And I also tried sync:

walkdir.sync(dir, options, callback)

function callback(path) {
  let res = wstream.write(`"${path}",`)
  if (!res) {
    wstream.once('drain', callback)
  }
  else {
    callback()
  }
}

But both of these produce the same result, RAM usage goes up to like 500-800MB

I also tried the following method, the RAM usage always stays at ~100MB but it doesn't really work, it writes 412kb into the file and then it keeps utilizing CPU but nothing really happens (other methods finish writing the file in under 1-2 minutes)

const readdirp = require('readdirp');

const { Transform } = require('stream');
const entryInfoStream = readdirp({
  root: dir
});

entryInfoStream
  .pipe(new Transform({
    objectMode: true,
    transform(entryInfo, encoding, callback) {
      this.push(entryInfo.path);
      callback();
    },
  }))
  .pipe(wstream);

Questions

Upvotes: 2

Views: 2291

Answers (2)

s.d
s.d

Reputation: 29436

You can implement entire logic without any external dependencies to see where to optimize. Below is a minimal implementation that you can tweak:

const fs = require('fs');
const path = require('path');
const zlib = require('zlib');
const stream = require('stream');

// Recursive walk file system
function walk(dir, str, busy) {
    busy.inc();
    fs.readdir(dir, (e, c) => {
        if (!e) {
            c.forEach(f => {
                const p = path.join(dir, f);
                busy.inc();
                fs.stat(p, (e, s) => {
                    if (!e && s.isDirectory()) {
                        walk(p, str, busy);
                    }
                    str.write(p + "\n");
                    busy.dec();
                });
            });
        }
        busy.dec();
    });
}

// Scan FS and write to file
async function scan(dir, dest) {
    return new Promise((resolve) => {
        const gzStr = zlib.createGzip();
        const destStr = fs.createWriteStream(dest);

        let count = 0;
        const busy = {
            inc: () => count++,
            dec: () => {
                count--;
                if (count < 1) {
                    process.nextTick(() => {
                        gzStr.end();
                        gzStr.once('finish', resolve);
                    });
                }
            }
        };

        walk(dir, gzStr, busy, resolve);
        gzStr.pipe(destStr);
    });
}

// Test above code
(async () => {
    // Save gzipped
    await scan(__dirname, './files.txt.gz');

    // Gunip to verify
    const unzipped = fs.createWriteStream('./files.txt');
    fs.createReadStream('./files.txt.gz').pipe(zlib.createGunzip()).pipe(unzipped);

    // End 
    unzipped.on('close', () => console.log('done'));
})();

Upvotes: 2

jakedipity
jakedipity

Reputation: 900

It's because your doing things asynchronously without any limits. Each path is going to create a new event for paths.on('path', ...) so all your paths are being loaded onto the event loop much faster than they are being processed hence the spike in memory. You need to limit the amount of paths that are being written at a time.

You can limit it by using walkdir.sync, but this means you'll only be able to process one path at a time. Also, depending on how you implement it, you might still end up discovering paths faster than you can write to your stream.

A more flexible solution is to track how many concurrent paths you are processing and pause the stream once you've hit the limit.

const fs = require('fs')
const walkdir = require('walkdir')

let options = {
  "max_depth": 0,
  "track_inodes": true
}

let dir = "C:/"
let paths = walkdir(dir, options)
var wstream = fs.createWriteStream('C:/test/file.txt')
wstream.write('[')

const maxPaths = 20; // Maximum amount of concurrent paths allowed to process
let currentPaths = 0; // Current amount of concurrent paths being processed
let deferredPaths = []; // If we somehow exceed the limit, store the excess paths here for later processing. This might not be necessary, depending on how walkdir implements their pause function

const finishPathFlush = () => {
  if (deferredPaths.length > 0) {
    // Process any paths in the deferred queue
    wstream.write('"' + deferredPaths.pop() + '",', finishPathFlush);
  } else {
    // No more work to do, resume walkdir
    --currentPaths;
    paths.resume();
  }
}

paths.on('path', function(path, stat) {
  if (currentPaths < maxPaths) {
    // We have room to process this path
    if (++currentPaths === maxPaths) {
      // If we reach the limit pause walkdir
      paths.pause();
    }
    wstream.write(`"${path}",`, finishPathFlush)
  } else {
    // Got too many paths, defer this path
    deferredPaths.push(path);
  }
})

paths.on('end', function(path, stat) {
  wstream.write(']')
  wstream.end()

  // Compressing the file after it's written:
  const gzip = require('zlib').createGzip()
  const inp = fs.createReadStream('C:/test/file.txt')
  const out = fs.createWriteStream('C:/test/file.txt.gz')
  inp.pipe(gzip).pipe(out)
})

Upvotes: 0

Related Questions