Reputation: 456
I have about 20M documents in a MongoDB Collection. now I want to add a field called "score", the score value is calculated via a machine learning model. Now my task is to go through each document in MongoDB and calculate the score and update the document. But the issue is the updating process is taking too long to complete.
Here is the code I am currently using.
const config = require('./config');
const mongoose = require('mongoose');
const audit = require("./lib/Audit")
const threads = 4;
let completed = 0;
let documents = 0;
mongoose.connect(config.db.uri, {useNewUrlParser: true, useCreateIndex: true});
console.log("Connected to mongodb!");
const Schema = require('./models/Schema.js');
const CliProgress = require('cli-progress');
const bar1 = new CliProgress.Bar({
etaBuffer: 5000,
format: '[{bar}] {percentage}% | ETA: {eta_formatted} | {value}/{total}'
}, CliProgress.Presets.shades_grey);
(async function (ref) {
documents = await Schema.find({'score': null}).count();
completed = 0;
bar1.start(documents, completed);
let corsor = Schema.find({'score': null}).cursor();
corsor.eachAsync((doc) => {
// console.log(update)
doc.lastmod = Date.now();
// Calculating The Score
doc.score = audit(doc.toObject())
doc.save();
// Saving To Database
completed ++;
bar1.update(completed);
return true;
}, {parallel: threads })
})();
Have a look and let me know how can I speed up the process? I already tried by increasing the value of the thread but it has little to no effect in operation speed.
Upvotes: 2
Views: 541
Reputation: 456
I got a solution to speed up the process quite a bit! By using bulk operation in this way This method is about 20 times faster in my case.
const config = require('./config');
const mongoose = require('mongoose');
const Audit = require("./lib/Audit");
const ObjectId = require('mongodb').ObjectId;
let completed = 0;
let documents = 0;
let timeouts = null;
mongoose.connect(config.db.uri, {useNewUrlParser: true, useCreateIndex: true});
console.log("Connected to mongodb!");
console.log('\033[2J');
const Schema = require('./models/Schema.js');
const CliProgress = require('cli-progress');
const bar1 = new CliProgress.Bar({
etaBuffer: 5000,
format: '[{bar}] {percentage}% | ETA: {eta_formatted} | {value}/{total}'
}, CliProgress.Presets.shades_grey);
(async function (ref) {
documents = await Schema.find({}).count();
completed = 0;
bar1.start(documents, completed);
let corsor = Schema.find().lean().cursor();
let bulk = Schema.collection.initializeOrderedBulkOp();
corsor.on("data", (doc) => {
// console.log(update)
doc.lastmod = Date.now();
doc.swarm.verified = true;
if(doc.swarm.seeders || doc.swarm.leechers){
doc.swarm.audit = Audit(doc);
}
bulk.find({_id: ObjectId(doc._id)}).updateOne({ $set: { "swarm": doc.swarm, lastmod: doc.lastmod }});
completed ++;
bar1.update(completed);
return true;
});
corsor.on('end',() => {
clearTimeout(timeouts);
bulk.execute(()=> {
console.log('\033[2J');
console.log('Task Completed!');
setTimeout(()=> {
process.exit();
}, 5000)
});
});
// {parallel: 200}
function start(){
timeouts = setTimeout(() => {
doTasks()
}, 120000);
}
start();
function doTasks() {
corsor.pause();
clearTimeout(timeouts);
console.log('\033[2J');
console.log('Updating Documents.....');
console.log('Waiting For Complete.....');
bulk.execute(()=> {
// Done Writing Documents
start()
console.log('\033[2J');
console.log('Resuming task........');
// Start another Bulk Operation
bulk = Schema.collection.initializeOrderedBulkOp();
// Resume The cursor
corsor.resume();
});
}
})();
Upvotes: 1