user4950757
user4950757

Reputation:

Is there a way to change the mongoDB structure (from nested/embedded documents to list of object reference), while maintaining the data?

I have a mongoDB database, used by nodejs via mongoose, and it involves nested/embedded documents like the following:

"people" : [
    {"name" : "james", "_id": ObjectId("randomrandom1")},
    {"name" : "arianna","_id": ObjectId("randomrandom2")},
    {"name" : "kyle","_id": ObjectId("randomrandom3")}
]

I am required to change the structure so I have separate 'person' documents and people would contain an array of the ObjectId of persons:

"people" : [{type:mongoose.Schema.Types.ObjectId, ref: 'Person'}]

and each 'person' documents would contain information of james, arianna and kyle - so that I can populate them when I need them.

I need to change the database structure while maintaining the already entered documents. Is there a way I can achieve this?

Upvotes: 2

Views: 1568

Answers (2)

chridam
chridam

Reputation: 103365

For improved performance especially when dealing with large collections, take advantage of using a Bulk() API for updating the collection efficiently in bulk as you will be sending the operations to the server in batches (for example, say a batch size of 1000). This gives you much better performance since you won't be sending every request to the server but just once in every 1000 requests, thus making your updates more efficient and quicker.

To change the database structure, the general algorithm here is to "loop" the collection's find() results and process the updates while having access to the current document information. Typically you'd want to change the structure doing it in bulk and your update will be based on information already contained in the fields (in your case the people array).

To insert the new documents to the person collection, you'd want to get the documents through running an aggregation operation on the old collection that groups the denormalised people array by the _id key, and for each grouped document return the _id and the name fields in the results. Use this results array to insert the documents to the new collection with looping the Bulk API write operation insert() method as the "safest" form of doing this without running the code all on the server.

Since the aggregate() method returns a cursor, you can use its forEach() method to iterate it and access each document thus setting up the bulk update operations in batches to then send across the server efficiently with the API.

The following examples demonstrate this approach, for doing it on both the server and within your application. The first one uses the Bulk() API available in MongoDB versions >= 2.6 and < 3.2.

Server side (mongo shell):


// Bulk insert new documents to person collection
var bulkInsertOp = db.person.initializeUnorderedBulkOp(), // initialise the bulk operations on the new person collection
    pipeline = [
        {"$unwind": "$people"},
        {
            "$group": {
                "_id": "$people._id",
                "name": { "$first": "$people.name" }
            }
        }
    ],
    counter =  0, // counter to keep track of the batch insert size
    cursor = db.collection.aggregate(pipeline); // Get person documents using aggregation framework on old collection

cursor.forEach(function(doc){
    bulkInsertOp.insert(doc); // insert the aggregated document to the new person collection
    counter++; // increment counter
    if (counter % 1000 == 0) { // execute the bulk insert operation in batches of 1000
        bulkInsertOp.execute();
        bulkInsertOp = db.person.initializeUnorderedBulkOp();
    }
});

if (counter % 1000 != 0) { bulkInsertOp.execute(); }

// Bulk update old collection to denormalize the people array

var bulkUpdateOp = db.collection.initializeUnorderedBulkOp(), // initialise the bulk operations on the new person collection
    count =  0, // counter to keep track of the batch insert size
    cur = db.collection.find({}); // Get all documents from collection

cur.forEach(function(doc){
    var peopleIds = doc.people.map(function(p){ return p._id; }); // Create an array of person ids for referencing
    bulkUpdateOp.find({ "_id": doc._id }).updateOne({ 
        "$set": { "people": peopleIds }
    });
    if (count % 1000 == 0) {
        bulkUpdateOp.execute();
        bulkUpdateOp = db.collection.initializeUnorderedBulkOp();
    }
});

if (count % 1000 != 0) { bulkUpdateOp.execute(); }

The next example applies to the new MongoDB version 3.2 which has since deprecated the Bulk API and provided a newer set of apis using bulkWrite().

It uses the same cursors as above but instead of iterating the result, create the arrays with the bulk operations by using its map() method:


 var pipeline = [
        {"$unwind": "$people"},
        {
            "$group": {
                "_id": "$people._id",
                "name": { "$first": "$people.name" }
            }
        }
    ],
    cursor = db.collection.aggregate(pipeline),
    bulkInsertOps = cursor.map(function (doc) { 
        return { 
            "insertOne": { "document": doc }         
        };
    }),
    cur = db.collection.find({}),
    bulkUpdateOps = cur.map(function (doc) { 
        var peopleIds = doc.people.map(function(p){ return p._id; });
        return { 
            "updateOne": { 
                "filter": { "_id": doc._id } ,              
                "update": { "$set": { "people": peopleIds } } 
            }         
        };
    });     

db.person.bulkWrite(bulkInsertOps, { "ordered": true });
db.collection.bulkWrite(bulkUpdateOps, { "ordered": true });

Mongoose Implementation

Implementing this on client side, there are various methods of doing this. You can use the query stream to "plug-in" to other Node streams such as http responses and write streams so everything "just works" out of the box together with the bulk api.

Within Mongoose, you can do the looping by accessing the underlying collection object from the base driver but making sure that a database connection is open before trying to access the Bulk() api methods. This ensures that a Node.js Db instance is present and a Collection() object can be obtained. Once you use the .collection accessor on the model, you can then use the Bulk() API available in For Mongoose versions ~3.8.8, ~3.8.22, 4.x that supports MongoDB Server versions >= 2.6 and < 3.2:

Client side:


// Get the results using a find stream
var pipeline = [
        {"$unwind": "$people"},
        {
            "$group": {
                "_id": "$people._id",
                "name": { "$first": "$people.name" }
            }
        }
    ], 
    stream = Model.aggregate(pipeline).stream();

mongoose.connection.on("open", function (err, conn) {

    var bulkInsertOp = Person.collection.initializeUnorderedBulkOp(),
        counter =  0;

    stream.on("error", function(err) {
        // handle err
    });

    stream.on("data", function (doc) {  
        async.series(
            [
                function(callback) {

                    bulkInsertOp.insert(doc);                   
                    counter++;

                    if (counter % 1000 == 0) {
                        bulkInsertOp.execute(function(err, result) {
                            if (err) throw err;   // handle err appropriately
                            bulkInsertOp = Person.collection.initializeOrderedBulkOp(); // re-initialise bulk operations
                            callback(); // do something with result
                        });
                    } else {
                        callback();
                    }
                }
            ],              
            // When all is done
            function(err) {
                if ( counter % 1000 != 0 ) // // Clean up remaining operations in queue
                    bulkInsertOp.execute(function(err,result) {
                        console.log("Inserted some more docs which had remained in the batch queue." );
                    });        
                console.log("I'm done now!") ;
            }
        );
});

In the above, the Stream api breaks the aggregation results in order to process one documents at a time as this allows you to build up your inserts in batches to then send over to the server in bulk, not loading everything at once.

The Bulk() then queues up as many operations at a time before actually sending to the server. So in this case above, writes are only sent to the server for processing in batches of 1000 entries. You can really choose anything up to the 16MB BSON limit, but keep it manageable.

On top of the operations being processed in bulk, the async library acts as an additional limiter which ensures that essentially no more than the limit of documents are in process at any time. The limit guards against making expensive "execute" calls by ensuring that the operations wait rather than queuing up more things.

Upvotes: 0

dikesh
dikesh

Reputation: 3125

Suppose my documents are in collections called coll like this

{
    "_id" : ObjectId("56b47c7a088d9fa3e1aa77a0"),
    "people" : [
        {
            "name" : "james",
            "_id" : ObjectId("56b47c7a088d9fa3e1aa779d")
        },
        {
            "name" : "arianna",
            "_id" : ObjectId("56b47c7a088d9fa3e1aa779e")
        },
        {
            "name" : "kyle",
            "_id" : ObjectId("56b47c7a088d9fa3e1aa779f")
        }
    ]
}

Now I can do aggregate to store all _id in another collection using aggregate like this

db.coll.aggregate([
    {
        $project: {
            _id : 0,
            'people._id' : 1
        }
    },
    {
        $out : 'somecoll'
    }
])

This will store all the IDs in another collection called somecoll as below:

{
    "_id" : ObjectId("56b47de8b47e47b58b64f312"),
    "people" : [
        {
            "_id" : ObjectId("56b47c7a088d9fa3e1aa779d")
        },
        {
            "_id" : ObjectId("56b47c7a088d9fa3e1aa779e")
        },
        {
            "_id" : ObjectId("56b47c7a088d9fa3e1aa779f")
        }
    ]
}

Upvotes: 1

Related Questions