Reputation: 291
I have a Kafka topic and a Spark application. The Spark application gets data from Kafka topic, pre aggregates it and stores it in Elastic Search. Sounds simple, right?
Everything works fine as expected, but the minute I set "spark.cores" property something other than 1, I start getting
version conflict, current version [2] is different than the one provided [1]
After researching a bit, I think the error is because multiple cores can have same document at the same time and thus, when one core is done with aggregation on its part and tries to write back to the document, it gets this error
TBH, I am a bit surprised by this behaviour because I thought Spark and ES would handle this on their own. This leads me to believe that maybe, there is something wrong with my approach.
How can I fix this? Is there some sort of "synchronized" or "lock" sort of concept that I need to follow?
Cheers!
Upvotes: 2
Views: 8195
Reputation: 8204
It sounds like you have several messages in the queue that all update the same ES document, and these messages are being a processed concurrently. There are two possible solutions:
First, you can use Kafka partitions to ensure that all the messages that update the same ES document are handled in sequence. This assumes that’s there’s some property in your message that Kafka can use to determine how messages map to ES documents.
The other way is the standard way of handling optimistic concurrency conflicts: retry the transaction. If you have some data from a Kafka message that you need to add to an ES document, and the current document in ES is version 1, then you can try to update it and save back version 2. But if someone else already wrote version 2, you can retry by using version 2 as a starting point, adding your new data, and saving version 3.
If either of these approaches destroys the concurrency you were expecting to get from Kafka and Spark, then you may need to rethink your approach. You may have to introduce a new processing stage that does some heavy lifting but doesn’t actually write to ES, then do the ES updates in a separate step.
Upvotes: 5
Reputation: 291
I would like to answer my own question. In my use case, I was updating the document counter. So, all I had to do was retry whenever a conflict arise because I just needed to aggregate my counter.
My use case was somewhat this:
For many uses of partial update, it doesn’t matter that a document has been changed. For instance, if two processes are both incrementing the page-view counter, it doesn’t matter in which order it happens; if a conflict occurs, the only thing we need to do is reattempt the update.
This can be done automatically by setting the retry_on_conflict parameter to the number of times that update should retry before failing; it defaults to 0.
Thanks to Willis and this blog, I was able to configure Elastic Search settings and now I am not having any problems at all
Upvotes: 2