Reputation: 387
I am trying to verify the working of manual offset commit.
When I try to exit the job either by using thread.sleep()/jssc.stop()/ throwing exceptions in the while loop, I see offsets are being committed.
I am just sending couple of message in order to test, but I see 0 lag as soon as the job starts processing the batch.
When does spark actually commit the offsets ?
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
kafkaStream.foreachRDD(kafkaStreamRDD -> {
// fetch kafka offsets for manually committing it later
OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();
// filter unwanted data
kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
//filter logic here
}).foreachPartition(kafkaRecords -> {
//Initializing DB connections
while (kafkaRecords.hasNext()) {
//doing some work here
//-----> EXCEPTION
throw new Exception();
}
});
// commit offsets saveOffsets after processing
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
if (exception != null) {
System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
exception.printStackTrace(); // need this for driver
} else {
System.out.println("Successfully committed offsets"); // need this for driver
for (OffsetRange offsetRange : offsetRanges) {
System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
}
}
});
enable.auto.commit
: false
Observe the throw new Exception();
in while loop. Even if the batch fails because of the exception, i see the offset committed, I am expecting some lag here as the processing failed, what is wrong here ?
Upvotes: 3
Views: 1534
Reputation: 3842
The beauty of the Spark structured stream on Kafka is that it provides manual offset which is not available in Kafka Stream. Spark stream commit is thread-safe which is async in nature and since Kafka is not transactional, so your outputs must still be idempotent. So means when you start consuming message your offset keeps incrementing whereas commit might be appeared latter. As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe but must occur after outputs.
You can check how you commit perform by using call back as below
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, new OffsetCommitCallback() {
def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
m.foreach(f => {
if (null != e) {
logger.info("Failed to cmomit:" + f._1 + "," + f._2)
logger.info("Error while commitAsync. Retry again"+e.toString)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} else {
println("Offset commit:" + f._1 + "," + f._2)
}
})
}
})
Upvotes: 2
Reputation: 387
In case of exceptions on worker node the task is resubmitted for a max value of spark.task.maxFailures
(Number of failures of any particular task before giving up on the job). Offsets are committed once the Dstream batch is processed. You have to handle the exception (logging error record or forwarding record to DLQ) based on you use case.
Upvotes: 0