Reputation: 100
I am trying to use "write.method" upsert for Elasticsearch (ES) kafka connector. From my kafka streams application I am writing my document that I want to upsert, on a kafka topic that ES connector is configured to read from. I am using avro objects as kafka values on this topic. My document's AVRO definition looks like below:
{
"type": "record",
"name": "Document",
"fields": [
{
"name": "id",
"type": ["null", "string"],
},
{
"name": "name",
"type": ["null", "string"]
},
{
"name": "address",
"type": ["null", "string"]
}
]
}
The document contains only id and name at times and other times it contains just address. id and name get overwritten when I just send address and vice versa. I have set behavior.on.null.values
to ignore
hoping that ES connector would ignore the null id and name values, but that doesn't work as expected.
Although when I use two different AVRO objects on my kafka topic, first one containing only id and name, the other one containing only address, upsert mode behavior is as expected. But for the same kafka topic to allow multiple AVRO object definitions, I need to set compatibility mode of the topic to NONE, which isn't ideal.
What's the right way to solve the issue at hand?
Upvotes: 2
Views: 1781
Reputation: 32090
The setting behavior.on.null.values = ignore
simply tells the connector that if it gets a message in which the entire message is null, to ignore that message (other options are to fail, or to delete the target document in Elasticsearch matching the key of the message with the null value, i.e. a tombstone message).
The connector does not support the behaviour you describe of partial updates. It can insert/update/delete, but only entire documents
If you want partial upsert behaviour then you'd need to implement this yourself, either in a custom connector or through storing state in your Kafka Streams application to be able to emit a complete record each time when a delta comes through.
Partial updates are possible with write.method=upsert
Upvotes: 5