unsalted
unsalted

Reputation: 353

index/ingest very large json file to database with node.js

I inherited a huge json file that I'm trying to index into elasticsearch (not really a database but don't get hung on es it should apply to most db ingest). I'm using node to do the ingest. I've tried streams and async but I'm stumped I have no frame for approaching this problem - without memory overflows and the like.

I can't post a 1 to 1 but it's effectively a multidimensional object that looks something like:

[ 
   { 
     document: {
        type: 1,
        type2: 2,
        type3: {...}
    },
    {...}
]

I just need to ingest the docs, I can use the elasticsearch client and process them in bulk. I need to slow down the stream, parse, and chunk.

Totally stuck... Help stackoverflow it's Friday I want to go home ; ).

Upvotes: 0

Views: 823

Answers (1)

unsalted
unsalted

Reputation: 353

Based on migg's suggestion of json-parse-stream - the third json stream library I tried - I finally have a working ingest. Matter of fact it is running as I write this. Hopefuly someone will find this useful.

const fs = require('graceful-fs');
const parse = require('json-parse-stream');
const es = require('event-stream');
const client = new require('elasticsearch').Client();
var WritableBulk = require('elasticsearch-streams').WritableBulk;
var TransformToBulk = require('elasticsearch-streams').TransformToBulk;


var rs = fs.createReadStream('./resources/mydoc.json');

var bulkExec = function (body, callback) {
  console.log(body);
  client.bulk({
    index: 'my_index',
    type: 'my_type',
    body: body
  }, callback);
};

var toBulk = new TransformToBulk(() => { return { _index: 'my_index', _type: 'my_type' }; });


const done = (err, res) =>  {
  if (err) {
    console.log(err);
  }
  console.log(res);
  console.log('go get a drink you deserve it');
};

var ws = new WritableBulk(bulkExec);

rs.pipe(parse())
.pipe(es.mapSync(function (element) {
  var a =  [];
  if (element.key === 'document') {
    a = element.value;
    return a;
  }
}))
.pipe(toBulk)
.pipe(ws).on('finish', done);

Upvotes: 2

Related Questions