anthozep
anthozep

Reputation: 333

Logstash doc_as_upsert cross index in Elasticsearch to eliminate duplicates

I have a logstash configuration that uses the following in the output block in an attempt to mitigate duplicates.

output {
        if [type] == "usage" {
                elasticsearch {
                        hosts => ["elastic4:9204"]
                        index => "usage-%{+YYYY-MM-dd-HH}"
                        document_id => "%{[@metadata][fingerprint]}"
                        action => "update"
                        doc_as_upsert => true
                }

        }
}

The fingerprint is calculated from a SHA1 hash of two unique fields.

This works when logstash sees the same doc in the same index, but since the command that generates the input data doesn't have a reliable rate at which different documents appear, logstash will sometimes insert duplicates docs in a different date stamped index.

For example, the command that logstash runs to get the input generally returns the last two hours of data. However, since I can't definitively tell when a doc will appear/disappear, I tun the command every fifteen minutes.

This is fine when the duplicates occur within the same hour. However, when the hour or day date stamp rolls over, and the document still appears, elastic/logstash thinks it's a new doc.

Is there a way to make the upsert work cross index? These would all be the same type of doc, they would simply apply to every index that matches "usage-*"

Upvotes: 3

Views: 1372

Answers (1)

Val
Val

Reputation: 217254

A new index is an entirely new keyspace and there's no way to tell ES to not index two documents with the same ID in two different indices.

However, you could prevent this by adding an elasticsearch filter to your pipeline which would look up the document in all indices and if it finds one, it could drop the event.

Something like this would do (note that usages would be an alias spanning all usage-* indices):

filter {
    elasticsearch {
        hosts => ["elastic4:9204"]
        index => "usages"
        query => "_id:%{[@metadata][fingerprint]}"
        fields => {"_id" => "other_id"}
    }
    # if the document was found, drop this one
    if [other_id] {
        drop {}
    }
}

Upvotes: 2

Related Questions