void
void

Reputation: 2543

Spark Streaming Direct Kafka API, OffsetRanges : How to handle first run

My spark-streaming application reads from Kafka using the direct stream approach without the help of ZooKeeper. I would like to handle failures such that Exactly-once Semantics is followed in my application. I am following this for reference. Everything looks perfect except for :

val stream: InputDStream[(String,Long)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Long)](
      ssc, kafkaParams, fromOffsets,
      // we're just going to count messages per topic, don't care about the contents, so convert each message to (topic, 1)
      (mmd: MessageAndMetadata[String, String]) => (mmd.topic, 1L))

In the very first run of the application, since there will be no offsets read, what value to pass in for the fromOffsets Map parameter? I am certainly missing something.

Thanks and appreciate any help!

Upvotes: 2

Views: 909

Answers (1)

Cody Koeninger
Cody Koeninger

Reputation: 611

The first offset isn't necessarily 0L, depending on how long the topics have existed.

I personally just pre-insert the appropriate offsets into the database separately. Then the spark job reads the offsets from the database at startup.

The file kafkacluster.scala in the spark Kafka integration has methods that make it easier to query Kafka for the earliest available offset. That file was private, but has been made public in the most recent spark code.

Upvotes: 2

Related Questions