Reputation: 377
I am trying to simulate the CommitFailedException thrown by Kafka.
I am manually setting the "session.timeout.ms" to 10000 ms and "enable.auto.commit" to false.
After, Kafkaconsumer.poll(), I have the statement, Thread.sleep(12000), after which I do the commit. I would expect that since the thread is taking 12s until the next poll, the consumer should have been marked as dead and a CommitFailedException should be thrown. However, the process executes smoothly.
How can i simulate the Exception being thrown by KafkaConsumer.
consumer.subscribe(Arrays.asList("foo"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
try {
Thread.sleep(12000);
}catch (Exception e){
e.printStackTrace();
}
consumer.commitSync();
}
Upvotes: 0
Views: 1275
Reputation: 3579
Kafka uses heartbeat mechanism via a separate thread to check health of a consumer. The consumer heartbeat thread must send a heartbeat to the broker before session.timeout.ms
time expires.
heartbeat.interval.ms: The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group.
session.timeout.ms: The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.
Another mechanism to check consumers liveliness is polling. A consumer is expected to poll() without expiring max.poll.interval.ms
. If this time expires (normally long running process leads this problem) again consumer considered as dead.
max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
If a consumer is considered as dead by Kafka either because of no heartbeat in session.timeout.ms
or no poll in max.poll.interval.ms
consumer cannot commit messages and gets CommitFailedException
.
CommitFailedException: This exception is raised when an offset commit with KafkaConsumer.commitSync() fails with an unrecoverable error. This can happen when a group rebalance completes before the commit could be successfully applied. In this case, the commit cannot generally be retried because some of the partitions may have already been assigned to another member in the group.
As a result; because the heartbeat thread is a separate thread, sleep in your code cannot affect that. But in your case, you can set max.poll.interval.ms
to 10 seconds to get CommitFailedException
.
Upvotes: 1