Gnana
Gnana

Reputation: 2230

Spark streaming checkpoint

I am reading messages from Kafka using Spark Kafka direct streaming. I want to implement zero message loss and after restarts spark, it has to read the missed messages from Kafka. I am using checkpoint to save all read offset, so that next time spark will start read from stored offset. this is my understanding.

I have used below code. I stopped my spark and pushed few message to Kafka. After restart the spark which is not reading missed messages from Kafka. Spark reads latest messages from kafka. How to read the missed message from Kafka?

val ssc = new StreamingContext(spark.sparkContext, Milliseconds(6000))
ssc.checkpoint("C:/cp")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val ssc = new StreamingContext(spark.sparkContext, Milliseconds(50))
val msgStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

Note: Application logs shows auto.offset.reset to none instead of latest. why ?

WARN KafkaUtils: overriding auto.offset.reset to none for executor

SBT

scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"

Windows : 7

Upvotes: 0

Views: 5430

Answers (2)

USY
USY

Reputation: 61

I would rather suggest not to rely on checkpointing instead you can use an external data store to save your processed Kafka message offset.Please follow the link to get some insight. https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

Upvotes: 0

KiranM
KiranM

Reputation: 1323

If you want to read missed out messages, try commit process instead of checkpoint.

Please understand, Spark can't read old messages with property:

"auto.offset.reset" -> "latest"

Try this:

val kafkaParams = Map[String, Object](
 //...
 "auto.offset.reset" -> "earliest",
 "enable.auto.commit" -> (false: java.lang.Boolean)
 //...
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  //Your processing goes here

  //Then commit after completing your process.
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Hope this helps.

Upvotes: 0

Related Questions