Reputation: 115
I'm receiving events which end up in Kafka. From these events I fetch the id using a Kafka Streams application and posting it back to Kafka as a pair of (id, 1) in another topic. Then I would like to see if the id exists already in ElasticSearch, and if so update its counter, otherwise create a new record in ElasticSearch with the id from Kafka and counter set to 1, i.e. an upsert of the record (id, 1) to ES.
I was hoping to use Kafka Connect to ElasticSearch for this, but it seems to be not that straightforward if possible at all. I can see that adding records to ES works, but merging with existing records seems is something I haven't found out about yet. Is this possible already, and if so, how, and if not, is it planned to be possible in a nearby release?
Upvotes: 3
Views: 1219
Reputation: 2066
I forked the datamountaineer ES sink connector to allow Upsert. With it you can specify a PK and run an update with docAsUpsert into ES. You can grab the project and compile the Jar from my github fork.
Upvotes: 3