Reputation: 146
I'm writing a Spark (v2.2) batch job which reads from a Kafka topic. Spark jobs are scheduling with cron. I can't use Spark Structured Streaming because non based-time windows are not supported.
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("subscribe", s"kafka_topic")
I need to set the offset for the kafka topic to know from where to start the next batch job. How can I do that?
Upvotes: 2
Views: 1063
Reputation: 3719
I guess you are using KafkaUtils to create stream, you can pass this as parameter.
val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,
Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))
Hoping this helps !
Upvotes: 1