Hugo Carmona
Hugo Carmona

Reputation: 304

Kafka CommitFailedException consumer exception

After create multiple consumers (using Kafka 0.9 java API) and each thread started, I'm getting the following exception

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
class com.messagehub.consumer.Consumer is shutting down.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905)

and then start consuming message normally, I would like to know what is causing this exception in order to fix it.

Upvotes: 8

Views: 27692

Answers (2)

CodingBee
CodingBee

Reputation: 1199

Two possible reasons -->

  1. If there are any network failures, consumers cannot reach out to the broker and will throw this exception. But there were no network failures when these exceptions occurred.
  2. As mentioned in the error trace, if too much time is spent on processing the message, the ConsumerCoordinator will lose the connection and the commit will fail. This is because of polling.

The values given here are the default Kafka consumer configuration values.

request.timeout.ms=40000
heartbeat.interval.ms=3000
max.poll.interval.ms=300000
max.poll.records=500
session.timeout.ms=10000

Solution -->

Reduced the max.poll.records to 100 but still, the exception was occurring some times. So changed the configurations as below;

request.timeout.ms=300000
heartbeat.interval.ms=1000 
max.poll.interval.ms=900000
max.poll.records=100
session.timeout.ms=600000

Reduced the heartbeat interval so that broker will be updated frequently that the Consumer is active. And also increased the session timeout configurations.

Upvotes: 0

ajkret
ajkret

Reputation: 392

Try also to tweak the following parameters:

  • heartbeat.interval.ms - This tells Kafka wait the specified amount of milliseconds before it consider the consumer will be considered "dead"
  • max.partition.fetch.bytes - This will limit the amount of messages (up to) the consumer will receive when polling.

I noticed that the rebalancing occurs if the consumer does not commit to Kafka before the heartbeat times out. If the commit occurs after the messages are processed, the amount of time to process them will determine these parameters. So, decreasing the number of messages and increasing the heartbeat time will help to avoid rebalancing.

Also consider to use more partitions, so there will be more threads processing your data, even with less messages per poll.

I wrote this small application to make tests. Hope it helps.

https://github.com/ajkret/kafka-sample

UPDATE

Kafka 0.10.x now offers a new parameter to control the number of messages received: - max.poll.records - The maximum number of records returned in a single call to poll().

UPDATE

Kafka offers a way to pause the queue. While the queue is paused, you can process the messages in a separated Thread, allowing you to call KafkaConsumer.poll() to send heartbeats. Then call KafkaConsumer.resume() after the processing is done. This way you mitigate the problems of causing rebalances due to not sending heartbeats. Here is an outline of what you can do :

while(true) {
    ConsumerRecords records = consumer.poll(Integer.MAX_VALUE);
    consumer.commitSync();

    consumer.pause();
    for(ConsumerRecord record: records) {

        Future<Boolean> future = workers.submit(() -> {
            // Process
            return true;
        }); 


       while (true) {
            try {
                if (future.get(1, TimeUnit.SECONDS) != null) {
                    break;
                }
            } catch (java.util.concurrent.TimeoutException e) {
                getConsumer().poll(0);
            }
        }
    }

    consumer.resume();
}

Upvotes: 11

Related Questions