prixa
prixa

Reputation: 100

kafka-connect-elasticsearch: When using "write.method" as upsert, is it possible to use same AVRO object on kafka topic to send partial document?

I am trying to use "write.method" upsert for Elasticsearch (ES) kafka connector. From my kafka streams application I am writing my document that I want to upsert, on a kafka topic that ES connector is configured to read from. I am using avro objects as kafka values on this topic. My document's AVRO definition looks like below:

{
  "type": "record",
  "name": "Document",
  "fields": [
    {
      "name": "id",
      "type": ["null", "string"],
    },
    {
      "name": "name",
      "type": ["null", "string"]
    },
    {
      "name": "address",
      "type": ["null", "string"]
    }
  ]
}

The document contains only id and name at times and other times it contains just address. id and name get overwritten when I just send address and vice versa. I have set behavior.on.null.values to ignore hoping that ES connector would ignore the null id and name values, but that doesn't work as expected.

Although when I use two different AVRO objects on my kafka topic, first one containing only id and name, the other one containing only address, upsert mode behavior is as expected. But for the same kafka topic to allow multiple AVRO object definitions, I need to set compatibility mode of the topic to NONE, which isn't ideal.

What's the right way to solve the issue at hand?

Upvotes: 2

Views: 1781

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32090

The setting behavior.on.null.values = ignore simply tells the connector that if it gets a message in which the entire message is null, to ignore that message (other options are to fail, or to delete the target document in Elasticsearch matching the key of the message with the null value, i.e. a tombstone message).

The connector does not support the behaviour you describe of partial updates. It can insert/update/delete, but only entire documents

If you want partial upsert behaviour then you'd need to implement this yourself, either in a custom connector or through storing state in your Kafka Streams application to be able to emit a complete record each time when a delta comes through.

Partial updates are possible with write.method=upsert

Upvotes: 5

Related Questions