HVT7
HVT7

Reputation: 719

Nodejs Read very large file(~10GB), Process line by line then write to other file

I have a 10 GB log file in a particular format, I want to process this file line by line and then write the output to other file after applying some transformations. I am using node for this operation.

Though this method is fine but it takes a hell lot of time to do this. I was able to do this within 30-45 mins in JAVA, but in node it is taking more than 160 minutes to do the same job. Following is the code:

Following is the initiation code which reads each line from the input.

var path = '../10GB_input_file.txt';
var output_file = '../output.txt';

function fileopsmain(){

    fs.exists(output_file, function(exists){
        if(exists) {
            fs.unlink(output_file, function (err) {
                if (err) throw err;
                console.log('successfully deleted ' + output_file);
            });
        }
    });

    new lazy(fs.createReadStream(path, {bufferSize: 128 * 4096}))
        .lines
        .forEach(function(line){
            var line_arr = line.toString().split(';');
            perform_line_ops(line_arr, line_arr[6], line_arr[7], line_arr[10]);
        }
    );

}

This is the method that performs some operation over that line and passes the input to write method to write it into the output file.

function perform_line_ops(line_arr, range_start, range_end, daynums){

    var _new_lines = '';
    for(var i=0; i<days; i++){
        //perform some operation to modify line pass it to print
    }

    write_line_ops(_new_lines);
}

Following method is used to write data into a new file.

function write_line_ops(line) {
    if(line != null && line != ''){
        fs.appendFileSync(output_file, line);
    }
}

I want to bring this time down to 15-20 mins. Is it possible to do so.

Also for the record I'm trying this on a intel i7 processor with 8 GB of RAM.

Upvotes: 8

Views: 13748

Answers (4)

Michał Karpacki
Michał Karpacki

Reputation: 2658

The execution is slow, because you're not using node's asynchronous operations. In essence, you're executing the code like this:

> read some lines
> transform
> write some lines
> repeat

While you could be doing everything at once, or at least reading and writing. Some examples in the answers here do that, but the syntax is at least complicated. Using scramjet you can do it in a couple simple lines:

const {StringStream} = require('scramjet');

fs.createReadStream(path, {bufferSize: 128 * 4096})
    .pipe(new StringStream({maxParallel: 128})    // I assume this is an utf-8 file
    .split("\n")                                  // split per line
    .parse((line) => line.split(';'))             // parse line
    .map([line_arr, range_start, range_end, daynums] => {
        return simplyReturnYourResultForTheOtherFileHere(
            line_arr, range_start, range_end, daynums
        );                                         // run your code, return promise if you're doing some async work
    })
    .stringify((result) => result.toString())
    .pipe(fs.createWriteStream)
    .on("finish", () => console.log("done"))
    .on("error", (e) => console.log("error"))

This will probably run much faster.

Upvotes: 0

teknopaul
teknopaul

Reputation: 6772

I know this is old but...

At a guess appendFileSync() _write()_s to the file system and waits for the response. Lots of small writes are generally expensive, presuming you use a BufferedWriter in Java you might get faster results by skipping some write()s.

Use one of the async writes and see if node buffers sensibly, or write the lines to large node Buffer until it is full and always write a full (or nearly full) Buffer. By tuning the buffer size you could validate if the number of writes affects perf. I suspect it would.

Upvotes: 0

Siggy
Siggy

Reputation: 334

I can't guess where the possible bottleneck is in your code.

  • Can you add the library or the source code of the lazy function?
  • How many operations does your perform_line_ops do? (if/else, switch/case, function calls)

I've created a example based on your given code, I know that this does not answer your question but maybe helps you understand how node handles such case.

const fs = require('fs')
const path = require('path')

const inputFile = path.resolve(__dirname, '../input_file.txt')
const outputFile = path.resolve(__dirname, '../output_file.txt')

function bootstrap() {
    // fs.exists is deprecated
    // check if output file exists
    // https://nodejs.org/api/fs.html#fs_fs_exists_path_callback
    fs.exists(outputFile, (exists) => {
        if (exists) {
            // output file exists, delete it
            // https://nodejs.org/api/fs.html#fs_fs_unlink_path_callback
            fs.unlink(outputFile, (err) => {
                if (err) {
                    throw err
                }

                console.info(`successfully deleted: ${outputFile}`)
                checkInputFile()
            })
        } else {
            // output file doesn't exist, move on
            checkInputFile()
        }
    })
}

function checkInputFile() {
    // check if input file can be read
    // https://nodejs.org/api/fs.html#fs_fs_access_path_mode_callback
    fs.access(inputFile, fs.constants.R_OK, (err) => {
        if (err) {
            // file can't be read, throw error
            throw err
        }

        // file can be read, move on
        loadInputFile()
    })
}

function saveToOutput() {
    // create write stream
    // https://nodejs.org/api/fs.html#fs_fs_createwritestream_path_options
    const stream = fs.createWriteStream(outputFile, {
        flags: 'w'
    })

    // return wrapper function which simply writes data into the stream
    return (data) => {
        // check if the stream is writable
        if (stream.writable) {
            if (data === null) {
                stream.end()
            } else if (data instanceof Array) {
                stream.write(data.join('\n'))
            } else {
                stream.write(data)
            }
        }
    }
}

function parseLine(line, respond) {
    respond([line])
}

function loadInputFile() {
    // create write stream
    const saveOutput = saveToOutput()
    // create read stream
    // https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options
    const stream = fs.createReadStream(inputFile, {
        autoClose: true,
        encoding: 'utf8',
        flags: 'r'
    })

    let buffer = null

    stream.on('data', (chunk) => {
        // append the buffer to the current chunk
        const lines = (buffer !== null)
            ? (buffer + chunk).split('\n')
            : chunk.split('\n')

        const lineLength = lines.length
        let lineIndex = -1

        // save last line for later (last line can be incomplete)
        buffer = lines[lineLength - 1]

        // loop trough all lines
        // but don't include the last line
        while (++lineIndex < lineLength - 1) {
            parseLine(lines[lineIndex], saveOutput)
        }
    })

    stream.on('end', () => {
        if (buffer !== null && buffer.length > 0) {
            // parse the last line
            parseLine(buffer, saveOutput)
        }

        // Passing null signals the end of the stream (EOF)
        saveOutput(null)
    })
}

// kick off the parsing process
bootstrap()

Upvotes: 1

mscdex
mscdex

Reputation: 106696

You can do this easily without a module. For example:

var fs = require('fs');
var inspect = require('util').inspect;

var buffer = '';
var rs = fs.createReadStream('foo.log');
rs.on('data', function(chunk) {
  var lines = (buffer + chunk).split(/\r?\n/g);
  buffer = lines.pop();
  for (var i = 0; i < lines.length; ++i) {
    // do something with `lines[i]`
    console.log('found line: ' + inspect(lines[i]));
  }
});
rs.on('end', function() {
  // optionally process `buffer` here if you want to treat leftover data without
  // a newline as a "line"
  console.log('ended on non-empty buffer: ' + inspect(buffer));
});

Upvotes: 6

Related Questions