Richard G
Richard G

Reputation: 5683

How to wait until all bulk writes are completed in elastic search api

Using NodeJS elastic search client. Trying to write a data importer to bulk import documents from MongoDB. The problem I'm having is the index refresh doesn't seem to wait until all documents are written to elastic before checking the counts.

Using the streams API in node to read the records into a batch, then using the elastic API bulk command to write the records. Shown below:

function rebuildIndex(modelName, queryStream, openStream, done) {
    logger.debug('Rebuilding %s index', modelName);
    async.series([
        function (next) {
          deleteType(modelName, function (err, result) {
            next(err, result);
          });
        },
        function (next) {
          var Model;
          var i = 0;
          var batchSize = settings.indexBatchSize;
          var batch = [];
          var stream;

          if (queryStream && !openStream) {
            stream = queryStream.stream();
          } else if (queryStream && openStream) {
            stream = queryStream;
          }else
          {
            Model = mongoose.model(modelName);
            stream = Model.find({}).stream();
          }

          stream.on("data", function (doc) {
            logger.debug('indexing %s', doc.userType);
            batch.push({
              index: {
                "_index": settings.index,
                "_type": modelName.toLowerCase(),
                "_id": doc._id.toString()
              }
            });
            var obj;
            if (doc.toObject){
              obj = doc.toObject();
            }else{
              obj = doc;
            }
            obj = _.clone(obj);

            delete obj._id;
            batch.push(obj);
            i++;
            if (i % batchSize == 0) {
              console.log(chalk.green('Loaded %s records'), i);
              client().bulk({
                body: batch
              }, function (err, resp) {
                if (err) {
                  next(err);
                } else if (resp.errors) {
                  next(resp);
                }
              });
              batch = [];
            }
          });

          // When the stream ends write the remaining records
          stream.on("end", function () {
            if (batch.length > 0) {
              console.log(chalk.green('Loaded %s records'), batch.length / 2);
              client().bulk({
                body: batch
              }, function (err, resp) {
                if (err) {
                  logger.error(err, 'Failed to rebuild index');
                  next(err);
                } else if (resp.errors) {
                  logger.error(resp.errors, 'Failed to rebuild index');
                  next(resp);
                } else {
                  logger.debug('Completed rebuild of %s index', modelName);
                  next();
                }
              });
            } else {
              next();
            }

            batch = [];
          })
        }

      ],
      function (err) {
        if (err)
          logger.error(err);
        done(err);
      }
    );
  }

I use this helper to check the document counts in the index. Without the timeout, the counts in the index are wrong, but with the timeout they're okay.

/**
   * A helper function to count the number of documents in the search index for a particular type.
   * @param type The type, e.g. User, Customer etc.
   * @param done A callback to report the count.
   */
  function checkCount(type, done) {
    async.series([
      function(next){
        setTimeout(next, 1500);
      },
      function (next) {
        refreshIndex(next);
      },
      function (next) {
        client().count({
          "index": settings.index,
          "type": type.toLowerCase(),
          "ignore": [404]
        }, function (error, count) {
          if (error) {
            next(error);
          } else {
            next(error, count.count);
          }
        });
      }
    ], function (err, count) {
      if (err)
        logger.error({"err": err}, "Could not check index counts.");
      done(err, count[2]);
    });
  }

And this helper is supposed to refresh the index after the update completes:

// required to get results to show up immediately in tests. Otherwise there's a 1 second delay
  // between adding an entry and it showing up in a search.
  function refreshIndex(done) {
    client().indices.refresh({
      "index": settings.index,
      "ignore": [404]
    }, function (error, response) {
      if (error) {
        done(error);
      } else {
        logger.debug("deleted index");
        done();
      }
    });
  }

The loader works okay, except this test fails because of timing between the bulk load and the count check:

it('should be able to rebuild and reindex customer data', function (done) {
    this.timeout(0); // otherwise the stream reports a timeout error
    logger.debug("Testing the customer reindexing process");

    // pass null to use the generic find all query
    searchUtils.rebuildIndex("Customer", queryStream, false, function () {
      searchUtils.checkCount("Customer", function (err, count) {
        th.checkSystemErrors(err, count);
        count.should.equal(volume.totalCustomers);
        done();
      })
    });
  });

I observe random results in the counts from the tests. With the artificial delay (setTimeout in the checkCount function) then the counts match. So I conclude that the documents are eventually written to elastic and the test would pass. I thought the indices.refresh would essentially force a wait until the documents are all written to the index, but it doesn't seem to be working with this approach.

The setTimeout hack is not really sustainable when the volume goes to actual production level....so how can I ensure the bulk calls are completely written to elastic index before checking the count of documents?

Upvotes: 11

Views: 9584

Answers (2)

Troy
Troy

Reputation: 1839

Take a look at the "refresh" parameter (elasticsearch documentation)

For example:

let bulkUpdatesBody = [ bulk actions / docs to index go here ]
client.bulk({
  refresh: "wait_for",
  body: bulkUpdatesBody
});

Upvotes: 13

Richard G
Richard G

Reputation: 5683

I'm not sure if this is the answer or not - but I flushed the index prior to checking the count. It "appears" to work, but I don't know if it's just because of the timing between the calls. Perhaps someone from elastic team knows if flushing the index will really solve the issue?

function checkCount(type, done) {
    async.series([
      function(next) {
        client().indices.flush({
          "index": settings.index,
          "ignore": [404]
        }, function (error, count) {
          if (error) {
            next(error);
          } else {
            next(error, count.count);
          }
        });
      },
      function (next) {
        refreshIndex(type, next);
      },
      function (next) {
        client().count({
          "index": settings.index,
          "type": type.toLowerCase(),
          "ignore": [404]
        }, function (error, count) {
          if (error) {
            next(error);
          } else {
            next(error, count.count);
          }
        });
      }
    ], function (err, count) {
      if (err)
        logger.error({"err": err}, "Could not check index counts.");
      done(err, count[2]);
    });
  }

Upvotes: 0

Related Questions