pac
pac

Reputation: 511

Read individual JSON from log file containing multiple lines of JSON

I'm trying to read a log file where each entry is a line of JSON(JSON structured text).

What I ultimately hope to do is iterate over each line and if

"Event":"SparkListenerTaskEnd"

is found that JSON line will be parsed for the values of keys "Finish Time" and "Executor CPU Time".

I'm new to node.js so not may be completely wrong but so far I've got this block of code for iterating through the file:

exports.getId(function(err, id){
    console.log(id);
    var data = fs.readFileSync('../PC Files/' + id, 'utf8', function(err, data) {
        var content = data.split('\n');
        async.map(content, function (item, callback) {
            callback(null, JSON.parse(item));
        }, function (err, content) {
            console.log(content);
        });
    });
    //console.log(data);
});

This doesn't seem to be doing anything though. However, I know the log file can be read as I can see it if I uncomment //console.log(data);.

Below is an example JSON line that I'm talking about:

{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1514983570810,"Executor ID":"0","Host":"192.168.111.123","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1514983574496,"Failed":false,"Killed":false,"Accumulables":[{"ID":22,"Name":"internal.metrics.input.recordsRead","Update":99171,"Value":99171,"Internal":true,"Count Failed Values":true},{"ID":20,"Name":"internal.metrics.shuffle.write.writeTime","Update":5893440,"Value":5893440,"Internal":true,"Count Failed Values":true},{"ID":19,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":3872,"Value":3872,"Internal":true,"Count Failed Values":true},{"ID":18,"Name":"internal.metrics.shuffle.write.bytesWritten","Update":1468516,"Value":1468516,"Internal":true,"Count Failed Values":true},{"ID":10,"Name":"internal.metrics.peakExecutionMemory","Update":16842752,"Value":16842752,"Internal":true,"Count Failed Values":true},{"ID":9,"Name":"internal.metrics.diskBytesSpilled","Update":0,"Value":0,"Internal":true,"Count Failed Values":true},{"ID":8,"Name":"internal.metrics.memoryBytesSpilled","Update":0,"Value":0,"Internal":true,"Count Failed Values":true},{"ID":7,"Name":"internal.metrics.resultSerializationTime","Update":1,"Value":1,"Internal":true,"Count Failed Values":true},{"ID":6,"Name":"internal.metrics.jvmGCTime","Update":103,"Value":103,"Internal":true,"Count Failed Values":true},{"ID":5,"Name":"internal.metrics.resultSize","Update":2597,"Value":2597,"Internal":true,"Count Failed Values":true},{"ID":4,"Name":"internal.metrics.executorCpuTime","Update":1207164005,"Value":1207164005,"Internal":true,"Count Failed Values":true},{"ID":3,"Name":"internal.metrics.executorRunTime","Update":2738,"Value":2738,"Internal":true,"Count Failed Values":true},{"ID":2,"Name":"internal.metrics.executorDeserializeCpuTime","Update":542927064,"Value":542927064,"Internal":true,"Count Failed Values":true},{"ID":1,"Name":"internal.metrics.executorDeserializeTime","Update":835,"Value":835,"Internal":true,"Count Failed Values":true}]},"Task Metrics":{"Executor Deserialize Time":835,"Executor Deserialize CPU Time":542927064,"Executor Run Time":2738,"Executor CPU Time":1207164005,"Result Size":2597,"JVM GC Time":103,"Result Serialization Time":1,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":1468516,"Shuffle Write Time":5893440,"Shuffle Records Written":3872},"Input Metrics":{"Bytes Read":0,"Records Read":99171},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[{"Block ID":"broadcast_1_piece0","Status":{"Storage Level":{"Use Disk":false,"Use Memory":true,"Deserialized":false,"Replication":1},"Memory Size":5941,"Disk Size":0}},{"Block ID":"broadcast_1","Status":{"Storage Level":{"Use Disk":false,"Use Memory":true,"Deserialized":true,"Replication":1},"Memory Size":9568,"Disk Size":0}},{"Block ID":"broadcast_0_piece0","Status":{"Storage Level":{"Use Disk":false,"Use Memory":true,"Deserialized":false,"Replication":1},"Memory Size":25132,"Disk Size":0}},{"Block ID":"broadcast_0","Status":{"Storage Level":{"Use Disk":false,"Use Memory":true,"Deserialized":true,"Replication":1},"Memory Size":390808,"Disk Size":0}}]}}

Update Here is my whole code. I'm sure it's not pretty but it works. I'll now look at improving it.

var http = require("http");
var fs = require('fs');
var async = require('async');
var readline = require('readline')


//get file name
var options =  {
    "method" : "GET",
    "hostname" : "xxx.xxx.xxx.xxx",
    "port" : "18080",
    "path" : "/api/v1/applications/"
};
exports.getId = function(callback) {
    var req = http.request(options, function (res) {

        var chunks = [];

        res.on("data", function (chunk) {
            chunks.push(chunk);
        });

        res.on("end", function () {
            var body = JSON.parse(Buffer.concat(chunks));

            var arrFound = Object.keys(body).filter(function(key) {
                if (body[key].name.indexOf("TestName") > -1) {
                    return body[key].name;
                }
            }).reduce(function(obj, key){
                obj = body[key].id;
                return obj;
            }, {});;
            //console.log("ID: ", arrFound);
            callback(null, arrFound);
        });
    });
    req.end();
}

// parse log file line at a time and for any use lines where the Event = SparkListenerTaskEnd
exports.getId(function(err, id){
    console.log(id);
    var lineReader = readline.createInterface({
        input: fs.createReadStream('../PC Files/' + id, 'utf8')
      });

    lineReader.on('line', function (line) {
        var obj = JSON.parse(line);
        if(obj.Event == "SparkListenerTaskEnd") {
            console.log('Line from file:', obj['Task Info']['Finish Time']);
        }
      });
});

Adam, I tried your suggested code but got the following error:

null
fs.js:646
  return binding.open(pathModule._makeLong(path), stringToFlags(flags), mode);
             ^

Error: ENOENT: no such file or directory, open '../PC Files/null'
    at Object.fs.openSync (fs.js:646:18)
    at Object.fs.readFileSync (fs.js:551:33)
    at /test-runner/modules/getEventLog.js:61:19
    at IncomingMessage.<anonymous> (/test-runner/modules/getEventLog.js:35:13)
    at emitNone (events.js:111:20)
    at IncomingMessage.emit (events.js:208:7)
    at endReadableNT (_stream_readable.js:1056:12)
    at _combinedTickCallback (internal/process/next_tick.js:138:11)
    at process._tickCallback (internal/process/next_tick.js:180:9)

Upvotes: 0

Views: 1225

Answers (1)

Adam Patterson
Adam Patterson

Reputation: 958

At first glance, it appears you are using callbacks incorrectly.

I assume you are using the getId function like:

getId(function(error, data) {
  // Do something with data
}

In which case, the callback function should be returned like:

// Remove the error, this will not be entered as a parameter
// Add callback as parameter
exports.getId(function(id, callback){
    console.log(id);
    var data = fs.readFileSync('../PC Files/' + id, 'utf8', function(err, data) {
        var content = data.split('\n');
        // Removed callback from here
        // We will not have access to the
        // to it here
        async.map(content, function (item) {
            callback(null, JSON.parse(item));
        // Add callback with error in place of null
        }, function (err, content) {
            callback(err)
            console.log(content);
        });
    });
    //console.log(data);
});

Upvotes: 1

Related Questions