HHH
HHH

Reputation: 169

Writing large amounts of streamed data frequently with writestream

I'm trying to write a live websocket feed line-by-line to a file - I think for this I should be using a writeable stream.

My problem here is that the data received is in the region of 10 lines per second, which quickly fills the buffer.

I understand when using streams from sources you control, you would normally add some sort of backpressure logic here, but what should I do if I do not control the source? Should I be batching up the writes and writing, say 500 lines at a time, instead of per line, or should I be using some other way to save this data?

Upvotes: 0

Views: 1034

Answers (1)

jfriend00
jfriend00

Reputation: 708106

I'm wondering how big are the lines? 10 lines per second sounds trivial to stream to a disk unless the lines are gigantic or the disk really slow. Ultimately, if you have no ability to apply backpressure logic, the source can overwhelm you if they go fast or your storage goes slow and you'd have to decide how much you can reasonably buffer and eventually just drop some of the data if you get behind.

But, you should be able to write a lot of data. On a my regular hard disk (using the generic stream code below with no additional buffering) I can do sequential writes of 100,000,000 bytes at a speed of 55 MBytes/sec:

So, if you have 10 lines per second coming in, as long as the lines were below 10,000,000 bytes each, my hard drive could keep up.

Here's the code I used to test it:

const fs = require('fs');
const { Bench } = require('../../Github/measure');
const { addCommas } = require("../../Github/str-utils");

const lineData = Buffer.from("012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678\n", 'utf-8');



let stream = fs.createWriteStream("D:\\Temp\\temp.txt");
stream.on('open', function() {
    let linesRemaining = 1_000_000;

    let b = new Bench();
    let bytes = 0;

    function write() {
        do {
            linesRemaining--;
            let readyMore;
            bytes += lineData.length;
            if (linesRemaining === 0) {
                readyForMore = stream.write(lineData, done);
            } else {
                readyForMore = stream.write(lineData);
            }
        } while (linesRemaining > 0 && readyForMore);
        if (linesRemaining > 0) {
            stream.once('drain', write);
        }
    }

    function done() {
        b.markEnd();
        console.log(`Time to write ${addCommas(bytes)} bytes: ${b.formatSec(3)}`);
        console.log(`bytes/sec = ${addCommas((bytes/b.sec).toFixed(0))}`);
        console.log(`MB/sec = ${addCommas(((bytes/(1024 * 1024))/b.sec).toFixed(1))}`);
        stream.end();
    }

    b.markBegin();
    write();
});

Theoretically, it is more efficient for your disk to do fewer writes that are larger, than tons of small writes. In practice, because of the way the writeStream works, as soon as an inefficient write gets slow, the next write will get buffered and it kind of self corrects. If you were really trying to minimize the load on the disk, you would buffer writes until you had at least something like 4k to write. The issue is that each write has potentially allocate some bytes to the file (which involves writing to a table on the disk), then seek to where the bytes should be written on the disk, then write the bytes. Fewer and larger writes that are larger (up to some limit that depends upon internal implementation) will reduce the number of times it has to do the file allocation overhead.

So, I ran a test. I modified the above code (shown below) to buffer into 4k chunks and write them out in 4k chunks. The write through increased from 55 MBytes/sec to 284.2 MBytes/sec.

So, the theory holds true that you will write faster if you buffer into larger chunks.

But, even the simpler, non-buffered version may be plenty fast.

Here's the test code for the buffered version:

const fs = require('fs');
const { Bench } = require('../../Github/measure');
const { addCommas } = require("../../Github/str-utils");

const lineData = Buffer.from("012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678\n", 'utf-8');



let stream = fs.createWriteStream("D:\\Temp\\temp.txt");
stream.on('open', function() {
    let linesRemaining = 1_000_000;

    let b = new Bench();
    let bytes = 0;

    let cache = [];
    let cacheTotal = 0;
    const maxBuffered = 4 * 1024;

    stream.myWrite = function(data, callback) {
        if (callback) {
            cache.push(data);
            return stream.write(Buffer.concat(cache), callback);
        } else {
            cache.push(data);
            cacheTotal += data.length;
            if (cacheTotal >= maxBuffered) {
                let ready = stream.write(Buffer.concat(cache));
                cache.length = 0;
                cacheTotal = 0;
                return ready;
            } else {
                return true;
            }
        }
    }

    function write() {
        do {
            linesRemaining--;
            let readyMore;
            bytes += lineData.length;
            if (linesRemaining === 0) {
                readyForMore = stream.myWrite(lineData, done);
            } else {
                readyForMore = stream.myWrite(lineData);
            }
        } while (linesRemaining > 0 && readyForMore);
        if (linesRemaining > 0) {
            stream.once('drain', write);
        }
    }

    function done() {
        b.markEnd();
        console.log(`Time to write ${addCommas(bytes)} bytes: ${b.formatSec(3)}`);
        console.log(`bytes/sec = ${addCommas((bytes/b.sec).toFixed(0))}`);
        console.log(`MB/sec = ${addCommas(((bytes/(1024 * 1024))/b.sec).toFixed(1))}`);
        stream.end();
    }

    b.markBegin();
    write();
});

This code uses a couple of my local libraries for measuring the time and formatting the output. If you want to run this yourself, you can substitute your own logic for those.

Upvotes: 1

Related Questions