Reputation: 1
I am trying to index some documents using the BulkAll method like follows:
var waitHandle = new CountdownEvent(1);
var bulkAll = _client.BulkAll(elementList, b => b
.Index(indexName)
.BackOffRetries(15)
.BackOffTime(TimeSpan.FromSeconds(55))
.RefreshOnCompleted()
.MaxDegreeOfParallelism(4)
.Size(500));
bulkAll.Subscribe(observer: new BulkAllObserver(
onNext: (b) =>
{
_logger.Debug("Indexed group of documents");
},
onError: (e) =>
{
_logger.Error(e, e.Message);
throw e;
},
onCompleted: () =>
{
waitHandle.Signal();
}));
waitHandle.Wait();
The problem is that once it sends the signal inside the onCompleted event, I have more documents into my index than expected, being the difference a multiplier of the size param, so I assume that it fails trying to index a group of documents, retries the operation and creates some duplicates.
I have tried to debug if there is any error putting a breakpoint inside the onError event, but nothing happens.
Is there any way to avoid those duplicates? Or at least remove them once that I have completed the indexation process?
I create the Elasticsearch client as follows:
ConnectionSettings settings;
settings = new ConnectionSettings(
new StaticConnectionPool(_infrastructureSettings.ElasticServerUrls));
settings.BasicAuthentication(_infrastructureSettings.ElasticsearchUsername, _infrastructureSettings.ElasticsearchPassword);
settings.DisableDirectStreaming();
settings.MaximumRetries(15);
settings.RequestTimeout(TimeSpan.FromMinutes(4));
var client = new ElasticClient(settings);
Upvotes: 0
Views: 1144
Reputation: 125528
I assume that it fails trying to index a group of documents, retries the operation and creates some duplicates.
BulkAll
never tries to retry indexing documents that have been successfully indexed.
If each document has an "id"
property/field, then this will be used as the "_id"
for the document which would avoid indexing the same document twice, since a subsequent document with the same id will overwrite the existing document.
Upvotes: 2