ngi
ngi

Reputation: 146

Offset Management For Apache Kafka With Apache Spark Batch

I'm writing a Spark (v2.2) batch job which reads from a Kafka topic. Spark jobs are scheduling with cron. I can't use Spark Structured Streaming because non based-time windows are not supported.

val df = spark
        .read
        .format("kafka")
        .option("kafka.bootstrap.servers", "...")
        .option("subscribe", s"kafka_topic")

I need to set the offset for the kafka topic to know from where to start the next batch job. How can I do that?

Upvotes: 2

Views: 1063

Answers (1)

Sachin Thapa
Sachin Thapa

Reputation: 3719

I guess you are using KafkaUtils to create stream, you can pass this as parameter.

val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,
                           Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))

Hoping this helps !

Upvotes: 1

Related Questions