MortenMoulder
MortenMoulder

Reputation: 6646

Insert streamed XML data database

I am trying to efficiently insert a lot of data (XML file is over 70GB in size) without crashing my MongoDB server. Currently this is what I am doing using xml-stream in NodeJS:

var fs = require('fs'),
    path = require('path'),
    XmlStream = require('xml-stream'),
    MongoClient = require('mongodb').MongoClient,
    assert = require('assert'),
    ObjectId = require('mongodb').ObjectID,
    url = 'mongodb://username:[email protected]:27017/mydatabase',
    amount = 0;

var stream = fs.createReadStream(path.join(__dirname, 'motor.xml'));
var xml = new XmlStream(stream);

xml.collect('ns:Statistik');
xml.on('endElement: ns:Statistik', function(item) {
    var insertDocument = function(db, callback) {
        db.collection('vehicles').insertOne(item, function(err, result) {
            amount++;
            if (amount % 1000 == 0) {
                console.log("Inserted", amount);
            }
            callback();
        });
    };

    MongoClient.connect(url, function(err, db) {
        insertDocument(db, function() {
            db.close();
        });
    });
});

When I call the xml.on() it basically returns the tree/element I am currently on. Since this is straight up JSON, I can just give it to my db.collection().insertOne() function as a parameter and it will insert it into the database exactly how I want it.

All of the code actually works as it is now, but it stops after about 3000 insertions (which takes about 10 seconds). I suspect it's because I open a database connection, insert the data, and then close the connection each and every time I see a tree in the XML file, in this case about 3000 times.

I could, somehow, incorporate the insertMany() function and do it in chunks of 100s (or more), but I'm not quite sure how that would work with it all being streamed and asynchronous.

So my question is: How do I insert a large amount of XML (to JSON) into my MongoDB database without it crashing?

Upvotes: 2

Views: 733

Answers (1)

Neil Lunn
Neil Lunn

Reputation: 151122

You are right to presume .insertMany() would be better than writing every single time, so it's really just a matter of collecting the data on the "stream".

Since the execution is "async", you typically want to avoid having too many active calls on the stack, so usually you .pause() the "stream" before calling the .insertMany() and then .resume() once the callback is complete:

var fs = require('fs'),
    path = require('path'),
    XmlStream = require('xml-stream'),
    MongoClient = require('mongodb').MongoClient,
    url = 'mongodb://username:[email protected]:27017/mydatabase',
    amount = 0;

MongoClient.connect(url, function(err, db) {

    var stream = fs.createReadStream(path.join(__dirname, 'motor.xml'));
    var xml = new XmlStream(stream);

    var docs = [];
    //xml.collect('ns:Statistik');

    // This is your event for the element matches
    xml.on('endElement: ns:Statistik', function(item) {
        docs.push(item);           // collect to array for insertMany
        amount++;

        if ( amount % 1000 === 0 ) { 
          xml.pause();             // pause the stream events
          db.collection('vehicles').insertMany(docs, function(err, result) {
            if (err) throw err;
            docs = [];             // clear the array
            xml.resume();          // resume the stream events
          });
        }
    });

    // End stream handler - insert remaining and close connection
    xml.on("end",function() {
      if ( amount % 1000 !== 0 ) {
        db.collection('vehicles').insertMany(docs, function(err, result) {
          if (err) throw err;
          db.close();
        });
      } else {
        db.close();
      }
    });

});

Or even modernizing it somewhat:

const fs = require('fs'),
      path = require('path'),
      XmlStream = require('xml-stream'),
      MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://username:[email protected]:27017/mydatabase';

(async function() {

  let amount = 0,
      docs = [],
      db;

  try {

    db = await MongoClient.connect(uri);

    const stream = fs.createReadStream(path.join(__dirname, 'motor.xml')),
          xml = new XmlStream(stream);

    await Promise((resolve,reject) => {
      xml.on('endElement: ns:Statistik', async (item) => {
        docs.push(item);
        amount++;

        if ( amount % 1000 === 0 ) {
          try {
            xml.pause();
            await db.collection('vehicle').insertMany(docs);
            docs = [];
            xml.resume();
          } catch(e) {
            reject(e)
          }
        }

      });

      xml.on('end',resolve);

      xml.on('error',reject);
    });

    if ( amount % 1000 !== 0 ) {
      await db.collection('vehicle').insertMany(docs);
    }

  } catch(e) {
    console.error(e);
  } finally {
    db.close();
  }

})();

Note that the MongoClient connection actually wraps all the other operations. You only want to connect once, and the other operations happen on the event handlers for the "stream".

So for your XMLStream the event handler is triggered on the expression match and the data extracted and collected to an array. Every 1000 items the .insertMany() call is made to insert the documents, "pausing" and "resuming" over the "async" call.

Once complete the "end" event is fired on the "stream". This is where you close the database connection, and the event loop will be released and end the program.

Whilst it's possible to get some degree of "parallelism" by allowing various .insertMany() calls to occur at once ( and typically to a "pooled size" so as to not overrun the call stack ), this is basically how the process looks in the simplest form by simply pausing whilst waiting for the other async I/O to complete.

NOTE: Commenting out the .collect() method from your original code as per the follow up question this would appear not to be necessary and in fact is retaining the nodes in memory that really should be discarded after each write to the database.

Upvotes: 3

Related Questions