Reputation: 3
We have a system that synchronises data from MongoDB to Elasticsearch . Here are the key components:
Version of the ELK stack being used: 8.7.1
Problem: We have a use case where following operations are performed in mongoDB in the sequential manner:
While we can see these operations are processed sequentially in Logstash, we're encountering an issue: the deletion of Document A is not reflected in Elasticsearch. The _version
on the doc in Elasticsearch is 3 indicating all 3 events are performed. This suggests that the delete operation might be processed before an update operation, causing Document A to remain in Elasticsearch having being deleted in MongoDB.
The logstash pipeline is as follows:
input {
kafka {
id => "my_plugin_id"
group_id => "logstash"
bootstrap_servers => "broker:29092"
topics => ["topic"]
auto_offset_reset => "earliest"
consumer_threads => 1
}
}
filter {
json {
source => "message"
target => "message"
add_field => { "mongoId" => "%{[message][documentKey][_id][$oid]}" }
}
}
output {
if [message][operationType] == "delete" {
elasticsearch {
hosts => "http://es01:9200"
user => "elastic"
password => "changeme"
index => "index_name"
document_id => "%{[mongoId]}"
action => "delete"
}
}
else {
elasticsearch {
hosts => "http://es01:9200"
index => "index_name"
document_id => "%{[mongoId]}"
user => "elastic"
password => "changeme"
pipeline => "index_pipeline"
}
}
}
Note: As mentioned in the above config, for other actions than delete, we use an ingestion pipeline that restructure document data to be indexed.The document_id
is set to mongoId.
One hypothesis, though subject to verification:
To the best of my knowledge, the Elasticsearch output plugin (used in logstash pipeline) uses bulk API to send data to elastic search. Can it be possible that events (sub requests) being processed in a single batch don't adhere to a strict ordering i.e. delete may be running (or completing) before update event, so the final document visible in elastic search corresponds to update operation.
Refresh index is set to default i.e. 1 time/sec
Also I increased the pipeline.batch.delay
(logstash pipeline config) is increased to 5000ms (default is 50ms) to make sure that all the events are in the same batch.
Upvotes: 0
Views: 255
Reputation: 217544
Having pipeline.workers set to 1 and ordered set to true ensures that your Logstash pipeline treats documents one by one. The events also arrive one by one in the output layer.
However, since you have two different elasticsearch
output plugins, both operate independently and each can send its batch at any arbitrary moment depending on how much the buffer is filled or how long it's been since last time it was sent off.
So what probably happens in your case is that the delete batch is sent first and does nothing since there's no document with that ID and then the create/update batch is sent next effectively creating the document.
What you should probably do is to set the value of a temporary action field (e.g. in [@metadata][action]
) to index
, update
or delete
inside the filter section and use that value in the single elasticsearch
output, like this:
output {
elasticsearch {
hosts => "http://es01:9200"
user => "elastic"
password => "changeme"
index => "index_name"
document_id => "%{[mongoId]}"
action => "%{[@metadata][action]}"
pipeline => "index_pipeline"
}
}
Your pipeline will be sent at the bulk level, so it won't have any effect on delete operations anyway, so there shouldn't be any issue there. If there is one, then simply remove the pipeline
setting from the output and set it directly in your index settings as the index.default_pipeline
property.
Try it out.
Upvotes: 0