4m1r
4m1r

Reputation: 12542

Creating synchronous queries with node-mysql

I'm trying to ensure that one mysql query leads to another and is not completed until all of its children queries are completed. So for example, I start with one select and stream rows and execute subsequent queries from that row result. This is doable with callbacks, but I end up running out of memory, so I'd like to slow down the process and run batches, but due to the async nature of the dispatch, I can't keep things in phase and end the connection after all the rows have been processed.

Here's an example:

var query = conn.query('select id from table1 limit 10');

query.on('result', function(row){
    console.log('query1', row);
    var query2 = conn.query('select id from books where id  = ? ', [row.id]);
    query2.on('result', function(row2){
        console.log('query2', row2);
        var query3 = conn.query('insert into test (id) values (?)', [row2.id]);
        query3.on('result', function(row3){
            console.log(row3);
        });
    });
});

query.on('end', function(){
    conn.end();
});

The above fails, because there are still rows to process in query3 after the initial query is ended.
Any thoughts? The actual code is even more complicated, because I have to process xml from the subsequent queries and fire off even more inserts as I loop through the batch.

Thanks!

Upvotes: 4

Views: 13724

Answers (4)

Jithin Joseph
Jithin Joseph

Reputation: 151

This is what i did,

db.query(
    "select name from USER where name = ?",
    ["test"],
    (err, result) => {
      if (err) {
        console.log("Error : ", err);
      } else if (result.length <= 0) {
        res.json("Not Found");
      } else {
        console.log("name found, executing update query!");
        updateAgeIfUserFound("test");  //Calling funtion with 2nd query
      }
    }
  );

  //Update age only if name is present
  function updateAgeIfUserFound(name, age) {
    if (name) {
      db.query(
        "update USER set age = ? where name = ?,
        [age, name],
        (err, result) => {
          if (err) throw err;
          console.log("Name Updated");
          res.json("Name Updated");
        }
      );
    }
  }

Upvotes: 0

EscapeNetscape
EscapeNetscape

Reputation: 2939

In my opinion the best solution is to make the code synchronously in a very easy way.

You could use the "synchonize" package.

Just

npm install synchronize

Then var sync = require(synchronize);

Put logic which should be synchronous into a fiber by using

sync.fiber(function() { //put your logic here }

An example for two mysql queries:

var express = require('express');
var bodyParser = require('body-parser');
var mysql = require('mysql');
var sync = require('synchronize');

var db = mysql.createConnection({
    host     : 'localhost',
    user     : 'user',
    password : 'password',
    database : 'database'
});

db.connect(function(err) {
    if (err) {
        console.error('error connecting: ' + err.stack);
        return;
    }
});

function saveSomething() {
    var post  = {id: newId};
    //no callback here; the result is in "query"
    var query = sync.await(db.query('INSERT INTO mainTable SET ?', post, sync.defer()));
    var newId = query.insertId;
    post  = {foreignKey: newId};
    //this query can be async, because it doesn't matter in this case
    db.query('INSERT INTO subTable SET ?', post, function(err, result) {
        if (err) throw err;
    });
}

When "saveSomething()" is called, it inserts a row in a main table and receives the last inserted id. After that the code below will be executed. No need for nesting promises or stuff like that.

Upvotes: 2

4m1r
4m1r

Reputation: 12542

@glukki, thanks for the great answer and reference to async. I went with a permutation of your code and two async requests which do a 'chomp and chew' using a single connection and pool of connections to process over 100K row select into 1.2M row inserts. Worked amazingly well and took less than 10 minutes. Here's the full implementation minus the module and connection setup. I hope this helps someone else too. Thanks again!

function populateMesh(row, callback){    

    xmlParser.parseString('<root>'+row.mesh_heading_list+'</root>', function(err, result){

        var q2 = async.queue(function (task, cb) {

            pool.getConnection(function(err, cnx){
                cnx.query('INSERT INTO abstract_mesh (mesh_id, abstract_id, major_topic) SELECT mesh_descriptor.id, ?, ? FROM mesh_descriptor WHERE mesh_descriptor.name = ?', [task.id, task.majorTopic, task.descriptorName], function(err, result){
                    if (err) {throw err;}
                    cnx.release();
                    cb();
                });
            });

        }, 50);

        q2.drain = function() {
            //console.log('all mesh processed');
            callback();
        }

        if(!(result.root instanceof Object)){
            //console.log('its not obj!', row.id);
            q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Null'}, function (err) {});
        }

        for(var i in result.root.MeshHeading){
//            console.log('in loop',result.root.MeshHeading[i].DescriptorName);
            if(typeof result.root.MeshHeading[i].DescriptorName === 'undefined'){
                q2.push({id: row.id, majorTopic: 'N', descriptorName: 'Emergency'}, function(err){});
            }

            for(var j in result.root.MeshHeading[i].DescriptorName){

                var descriptorName = result.root.MeshHeading[i].DescriptorName[j]._;
                var majorTopic = result.root.MeshHeading[i].DescriptorName[j].$.MajorTopicYN;

                q2.push({id: row.id, majorTopic: majorTopic, descriptorName: descriptorName}, function (err) {});

            }
        }
    });       

}


// here goes task serving logic
// if any async function should be finished before drain callback, push them into q
var q = async.queue(function (row, callback) {
        console.log('got id: ' + row.id);
        populateMesh(row, function(){
            callback();
        });

    }, 10);

    q.drain = function() {
        console.log('all items have been processed');
        conn.end(function(err){
            console.log('connection ended');
        });
        pool.end(function(err){
            console.log('pool closed');
        });
    };

var truncate = conn.query('truncate abstract_mesh');

var select = conn.query('SELECT id, mesh_heading_list FROM pubtbl');

    select.on('result', function(result){
//        console.log(result);
        q.push(result, function (err) {
            //console.log('finished processing row');
        });
    });

Upvotes: 2

glukki
glukki

Reputation: 2746

I would suggest this solution with async module:

var async = require("async");
// connection instance
var conn;

// here goes task serving logic
// if any async function should be finished before drain callback, push them into q
var solvers = {
    query: function(q, task, row){
        console.log('query1', row);
        q.push({
            solver: "query2",
            req: "select id from books where id = ?",
            reqArgs: [row.id]
        });
    },
    query2: function(q, task, row){
        console.log('query2', row);
        q.push({
            solver: "query3",
            req: "insert into test (id) values (?)",
            reqArgs: [row.id]
        });
    },
    query3: function(q, task, row){
        console.log(row);
    }
}

// here is a queue of tasks
var q = async.queue(function(task, cb){
    var query = conn.query(task.req, task.reqArgs);
    query.on("end", cb);
    query.on("result",function(row){
        solvers[task.solver](q, task, row);
    });
}, 2); // limit of parallel queries

// when every request has reached "end"
q.drain = function(){
    conn.end();
    // continue from here
};

// initial task
q.push({
    solver: "query",
    req: "select id from table1 limit 10",
    reqArgs: []
});

But still, I'm not sure that making requests ID by ID is a good solution.
Maybe, I'm just not aware of a full problem.

Upvotes: 2

Related Questions