Reputation: 2230
I am reading messages from Kafka using Spark Kafka direct streaming. I want to implement zero message loss and after restarts spark, it has to read the missed messages from Kafka. I am using checkpoint to save all read offset, so that next time spark will start read from stored offset. this is my understanding.
I have used below code. I stopped my spark and pushed few message to Kafka. After restart the spark which is not reading missed messages from Kafka. Spark reads latest messages from kafka. How to read the missed message from Kafka?
val ssc = new StreamingContext(spark.sparkContext, Milliseconds(6000))
ssc.checkpoint("C:/cp")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val ssc = new StreamingContext(spark.sparkContext, Milliseconds(50))
val msgStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
Note: Application logs shows auto.offset.reset to none instead of latest. why ?
WARN KafkaUtils: overriding auto.offset.reset to none for executor
SBT
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"
Windows : 7
Upvotes: 0
Views: 5430
Reputation: 61
I would rather suggest not to rely on checkpointing instead you can use an external data store to save your processed Kafka message offset.Please follow the link to get some insight. https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/
Upvotes: 0
Reputation: 1323
If you want to read missed out messages, try commit process instead of checkpoint.
Please understand, Spark can't read old messages with property:
"auto.offset.reset" -> "latest"
Try this:
val kafkaParams = Map[String, Object](
//...
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
//...
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//Your processing goes here
//Then commit after completing your process.
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Hope this helps.
Upvotes: 0