Reputation: 353
I inherited a huge json file that I'm trying to index into elasticsearch (not really a database but don't get hung on es it should apply to most db ingest). I'm using node to do the ingest. I've tried streams and async but I'm stumped I have no frame for approaching this problem - without memory overflows and the like.
I can't post a 1 to 1 but it's effectively a multidimensional object that looks something like:
[
{
document: {
type: 1,
type2: 2,
type3: {...}
},
{...}
]
I just need to ingest the docs, I can use the elasticsearch client and process them in bulk. I need to slow down the stream, parse, and chunk.
Totally stuck... Help stackoverflow it's Friday I want to go home ; ).
Upvotes: 0
Views: 823
Reputation: 353
Based on migg's suggestion of json-parse-stream - the third json stream library I tried - I finally have a working ingest. Matter of fact it is running as I write this. Hopefuly someone will find this useful.
const fs = require('graceful-fs');
const parse = require('json-parse-stream');
const es = require('event-stream');
const client = new require('elasticsearch').Client();
var WritableBulk = require('elasticsearch-streams').WritableBulk;
var TransformToBulk = require('elasticsearch-streams').TransformToBulk;
var rs = fs.createReadStream('./resources/mydoc.json');
var bulkExec = function (body, callback) {
console.log(body);
client.bulk({
index: 'my_index',
type: 'my_type',
body: body
}, callback);
};
var toBulk = new TransformToBulk(() => { return { _index: 'my_index', _type: 'my_type' }; });
const done = (err, res) => {
if (err) {
console.log(err);
}
console.log(res);
console.log('go get a drink you deserve it');
};
var ws = new WritableBulk(bulkExec);
rs.pipe(parse())
.pipe(es.mapSync(function (element) {
var a = [];
if (element.key === 'document') {
a = element.value;
return a;
}
}))
.pipe(toBulk)
.pipe(ws).on('finish', done);
Upvotes: 2