Reputation: 53
We are using a JavaInputDStream<ConsumerRecord<String, String>> to read messages (value: JSON-String) from Apache Kafka, join some OracleDB data and write to ElasticSearch.
We implemented our Offset Management as described in the Spark Streaming - Kafka Integration Guide but now we just realized that the Offset Management is not working for us and that the Stream is not reading a message again if there is a failure in the current mini-batch. Even if we skip this line it does not read a message again:
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
We broke down our code to the following and expected that the stream ends up in a loop reading the same messages again and again, but it doesn't:
stream.foreachRDD(recordRDD -> {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) recordRDD.rdd()).offsetRanges();
if (!recordRDD.isEmpty()) {
LOGGER.info("Processing some Data: " + recordRDD.rdd().count());
}
});
The Consumer config-param enable.auto.commit is set to false, which is also shown in the Log after initializing the JavaInputDStream. We're facing the same problem with our embedded Kafka Broker in test and our Kafka-Server on dev-stage. Both run in standalone mode at the moment.
What we tried is:
None of this worked and it seems like we searched the whole web whithout finding the reason for our problems. It seems like the Stream is ignoring the enable.auto.commit config and just commits after reading the messages for the current RDD. Whatever we try, our stream just keeps reading every message exactly once.
Are there any different approaches or facts i am missing?
Upvotes: 3
Views: 1111
Reputation: 53
After some more tests we found out that the manual commit is working only if the stream is stopped/crashes during the actual batch. If the streams stops and starts newly again it consumes the failed data again.
So what we are doing at the moment is stopping the stream directly whenever we detect a failure javaStreamingContext.stop(false)
.
After this the stream is started again by a scheduler, which verifies that the stream is alive in a regulary period of time and starts it if it isn't.
This is not an elegant solution, but it works for us for first.
Upvotes: 2