justDan
justDan

Reputation: 2336

Async.waterfall: Callback already was called?

I know this is a topic that has been brought up here before, but I'm using async.waterfall with rethinkdb and I'm getting Error: Callback was already called. The odd part is that even though it throws that error and crashes the app, it still will create the database and tables I need. I have read a few other post with answers like NodeJS Async: Callback already called? or Using async.waterfall, but I can't seem to get anywhere. My console also tells me the error is at db.js:40:9, but I'm new to Node and just not sure what it wants with the callbacks. What am I doing wrong? Do I need to nest my callbacks here? The code I am using is posted below. Any help I can get here is greatly appreciated and if need be I can post other relevant code. Thanks guys.

db.js:

    exports.setDatabaseAndTables = function() {
        async.waterfall([

        function connect(callback) {
          r.connect(config.rethinkdb, callback);
        },

        function createDatabase(connection, callback) {
          // Create the database if needed.
          r.dbList().contains(config.rethinkdb.db).do(function(containsDb) {
            return r.branch(
              containsDb,
              {created: 0},
              r.dbCreate(config.rethinkdb.db)
            );
          }).run(connection, function(err) {
            callback(err, connection);
          });
        },

        function createTable(connection, callback) {
          // Create the tables if needed.
          r.tableList().contains('todos').do(function(containsTable) {
            return r.branch(
              containsTable,
              {created: 0},
              r.tableCreate('todos')
            );
          }).run(connection, function(err) {
            callback(err, connection);
          });
          r.tableList().contains('users').do(function(containsTable) {
            return r.branch(
              containsTable,
              {created: 0},
              r.tableCreate('users')
            );
          }).run(connection, function(err) {
            callback(err, connection);
          });
        },

        function createIndex(connection, callback) {
          // Create the indexes if needed.
          r.table('todos').indexList().contains('createdAt').do(function(hasIndex) {
            return r.branch(
              hasIndex,
              {created: 0},
              r.table('todos').indexCreate('createdAt')
            );
          }).run(connection, function(err) {
            callback(err, connection);
          });
          r.table('users').indexList().contains('createdAt').do(function(hasIndex) {
            return r.branch(
              hasIndex,
              {created: 0},
              r.table('users').indexCreate('createdAt')
            );
          }).run(connection, function(err) {
            callback(err, connection);
          });
        },

        function waitForIndex(connection, callback) {
          // Wait for the index to be ready.
          r.table('todos').indexWait('createdAt').run(connection, function(err, result) {
            callback(err, connection);
          });
          r.table('users').indexWait('createdAt').run(connection, function(err, result) {
            callback(err, connection);
          });
        }
      ], 

      function(err, connection) {
        if(err) {
          console.error(err);
          process.exit(1);
          return;
        }
      });
    };

Upvotes: 1

Views: 356

Answers (1)

evilReiko
evilReiko

Reputation: 20473

This is a common problem for those trying out "async.waterfall".

Here is the solution by spliting "createTable", "createIndex" and "waitForIndex" into 2 functions each:

exports.setDatabaseAndTables = function() {
    async.waterfall([

        function connect(callback) {
            r.connect(config.rethinkdb, callback);
        },

        function createDatabase(connection, callback) {
            // Create the database if needed.
            r.dbList().contains(config.rethinkdb.db).do(function(containsDb) {
                return r.branch(
                        containsDb,
                        {created: 0},
                        r.dbCreate(config.rethinkdb.db)
                        );
            }).run(connection, function(err) {
                callback(err, connection);
            });
        },

        function createTableTodos(connection, callback) {
            // Create the tables if needed.
            r.tableList().contains('todos').do(function(containsTable) {
                return r.branch(
                        containsTable,
                        {created: 0},
                        r.tableCreate('todos')
                        );
            }).run(connection, function(err) {
                callback(err, connection);
            });
        },

        function createTableUsers(connection, callback) {
            r.tableList().contains('users').do(function(containsTable) {
                return r.branch(
                        containsTable,
                        {created: 0},
                        r.tableCreate('users')
                        );
            }).run(connection, function(err) {
                callback(err, connection);
            });
        },

        function createIndexTodos(connection, callback) {
            // Create the indexes if needed.
            r.table('todos').indexList().contains('createdAt').do(function(hasIndex) {
                return r.branch(
                        hasIndex,
                        {created: 0},
                        r.table('todos').indexCreate('createdAt')
                        );
            }).run(connection, function(err) {
                callback(err, connection);
            });
        },

        function createIndexUsers(connection, callback) {
            r.table('users').indexList().contains('createdAt').do(function(hasIndex) {
                return r.branch(
                        hasIndex,
                        {created: 0},
                        r.table('users').indexCreate('createdAt')
                        );
            }).run(connection, function(err) {
                callback(err, connection);
            });
        },

        function waitForIndexTodos(connection, callback) {
            // Wait for the index to be ready.
            r.table('todos').indexWait('createdAt').run(connection, function(err, result) {
                callback(err, connection);
            });
        },

        function waitForIndexUsers(connection, callback) {
            r.table('users').indexWait('createdAt').run(connection, function(err, result) {
                callback(err, connection);
            });
        }
    ],
            function(err, connection) {
                if(err) {
                    console.error(err);
                    process.exit(1);
                    return;
                }
            });
};

Upvotes: 1

Related Questions