Fereshteh Rabet
Fereshteh Rabet

Reputation: 201

how to bulk insert documents to ElasticSearch without updating when document exists

I'm using elastic search with Nest library. I would like to know how can I bulk insert documents to ElasticSearch without updating when document exists?

Upvotes: 0

Views: 3430

Answers (1)

Russ Cam
Russ Cam

Reputation: 125498

Here's an example of a bulk API call that will perform create operations

private static void Main()
{
    var defaultIndex = "documents";
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
    var settings = new ConnectionSettings(pool)
        .DefaultIndex(defaultIndex);

    var client = new ElasticClient(settings);

    if (client.IndexExists(defaultIndex).Exists)
        client.DeleteIndex(defaultIndex);

    client.Index(new MyDocument(1) 
    { 
        Message = "new" 
    }, i => i.Refresh(Refresh.WaitFor));

    var documents = new [] 
    {
        new MyDocument(1) { Message = "updated" },
        new MyDocument(2) { Message = "updated" },
        new MyDocument(3) { Message = "updated" },
    };

    client.Bulk(b => b
        .CreateMany(documents)
        .Refresh(Refresh.WaitFor)
    );

    var getResponse = client.Get<MyDocument>(1);

    Console.WriteLine(getResponse.Source.Message == "new");
}

public class MyDocument 
{
    public MyDocument(int id) => Id = id;

    public int Id { get; set; }  

    public string Message { get; set; }
}

The output will be true meaning document with Id 1 was not created within the bulk call because it already exists. If you take a look at the bulk response, it'll be a HTTP 200 response similar to

{
  "took" : 1387,
  "errors" : true,
  "items" : [
    {
      "create" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "1",
        "status" : 409,
        "error" : {
          "type" : "version_conflict_engine_exception",
          "reason" : "[mydocument][1]: version conflict, document already exists (current version [1])",
          "index_uuid" : "DZIgGMZcSlWRycC1MGhJWQ",
          "shard" : "3",
          "index" : "documents"
        }
      }
    },
    {
      "create" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "2",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    },
    {
      "create" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "3",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    }
  ]
}

Importantly, "errors" is true and the first "create" operation response indicates what the error was.

An alternative approach to using .CreateMany(...) would be to use .UpdateMany(...) with an upsert operation, specifying a "no-op" operation in the case the document exists

client.Bulk(b => b
    .UpdateMany(documents, (d, document) => d
        .Upsert(document)
        .Script(s => s
            .Source("ctx.op = 'none'")
        )
    )
    .Refresh(Refresh.WaitFor)
);

The outcome is the same, that is, document with Id 1 is not overwritten, but the response is slightly different

{
  "took" : 1307,
  "errors" : false,
  "items" : [
    {
      "update" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "1",
        "_version" : 1,
        "result" : "noop",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "status" : 200
      }
    },
    {
      "update" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "2",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    },
    {
      "update" : {
        "_index" : "documents",
        "_type" : "mydocument",
        "_id" : "3",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    }
  ]
}

Note that "errors" now is false, and the first "update" operation is a "noop".

Upvotes: 4

Related Questions