Aman Yadav
Aman Yadav

Reputation: 3

Elasticsearch is processing incoming events/messages from Logstash in a non-sequential order

We have a system that synchronises data from MongoDB to Elasticsearch . Here are the key components:

  1. MongoDB Source Connector (Kafka connect connector): This component reads events from the MongoDB oplog and produces messages on a Kafka topic.
  2. Logstash: Logstash consumes these messages from Kafka and sends them to Elasticsearch. We've configured Logstash with specific settings, including pipeline.workers: 1 and pipeline.ordered: true, to ensure that events are processed in the order they are received.
  3. ElasticSearch: 1 node cluster with index having single primary shard.

Version of the ELK stack being used: 8.7.1

Problem: We have a use case where following operations are performed in mongoDB in the sequential manner:

  1. Creation of Document A
  2. Update of Document A
  3. Deletion of Document A

While we can see these operations are processed sequentially in Logstash, we're encountering an issue: the deletion of Document A is not reflected in Elasticsearch. The _version on the doc in Elasticsearch is 3 indicating all 3 events are performed. This suggests that the delete operation might be processed before an update operation, causing Document A to remain in Elasticsearch having being deleted in MongoDB.

The logstash pipeline is as follows:

 input {
    kafka {
        id => "my_plugin_id"
        group_id => "logstash"
        bootstrap_servers => "broker:29092"
        topics => ["topic"]
        auto_offset_reset => "earliest"
        consumer_threads => 1
    }
}

filter {
    json {
        source => "message"
        target => "message"
        add_field => { "mongoId" => "%{[message][documentKey][_id][$oid]}" }
    }
}


output {
    if [message][operationType] == "delete" {
        elasticsearch {
        hosts => "http://es01:9200"
        user => "elastic"
        password => "changeme"
        index => "index_name"
        document_id => "%{[mongoId]}"
        action => "delete"
        }
    }
    else {
        elasticsearch {
        hosts => "http://es01:9200"
        index => "index_name"
        document_id => "%{[mongoId]}"
        user => "elastic"
        password => "changeme"
        pipeline => "index_pipeline"
        }
    }
}

Note: As mentioned in the above config, for other actions than delete, we use an ingestion pipeline that restructure document data to be indexed.The document_id is set to mongoId.

One hypothesis, though subject to verification: To the best of my knowledge, the Elasticsearch output plugin (used in logstash pipeline) uses bulk API to send data to elastic search. Can it be possible that events (sub requests) being processed in a single batch don't adhere to a strict ordering i.e. delete may be running (or completing) before update event, so the final document visible in elastic search corresponds to update operation. Refresh index is set to default i.e. 1 time/sec Also I increased the pipeline.batch.delay (logstash pipeline config) is increased to 5000ms (default is 50ms) to make sure that all the events are in the same batch.

Upvotes: 0

Views: 255

Answers (1)

Val
Val

Reputation: 217544

Having pipeline.workers set to 1 and ordered set to true ensures that your Logstash pipeline treats documents one by one. The events also arrive one by one in the output layer.

However, since you have two different elasticsearch output plugins, both operate independently and each can send its batch at any arbitrary moment depending on how much the buffer is filled or how long it's been since last time it was sent off.

So what probably happens in your case is that the delete batch is sent first and does nothing since there's no document with that ID and then the create/update batch is sent next effectively creating the document.

What you should probably do is to set the value of a temporary action field (e.g. in [@metadata][action]) to index, update or delete inside the filter section and use that value in the single elasticsearch output, like this:

output {
    elasticsearch {
        hosts => "http://es01:9200"
        user => "elastic"
        password => "changeme"
        index => "index_name"
        document_id => "%{[mongoId]}"
        action => "%{[@metadata][action]}"
        pipeline => "index_pipeline"
    }
}

Your pipeline will be sent at the bulk level, so it won't have any effect on delete operations anyway, so there shouldn't be any issue there. If there is one, then simply remove the pipeline setting from the output and set it directly in your index settings as the index.default_pipeline property.

Try it out.

Upvotes: 0

Related Questions