Shades88
Shades88

Reputation: 8360

Kafka Consumer error: Marking coordinator dead

I have a topic with 10 partitions in Kafka 0.10.0.1 cluster. I have an application that spawns multiple consumer threads. For this topic I am spawning 5 threads. Many times in my application logs I am seeing this entry

INFO :: AbstractCoordinator:600 - Marking the coordinator x.x.x.x:9092
(id:2147483646 rack: null) dead for group notifications-consumer

Then there are several entries saying (Re-)joining group notifications-consumer. Afterwards I also see one warning saying

Auto commit failed for group notifications-consumer: Commit cannot be completed since
the group has already rebalanced and assigned the partitions to another member. This means
that the time between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is spending too much time 
message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned by poll() with max.poll.records.

Now I have already adjusted my consumer config like so

props.put("max.poll.records", 200);
props.put("heartbeat.interval.ms", 20000);
props.put("session.timeout.ms", 60000);

So, even after properly adjusting the config I am still getting this error. During the rebalance our app is completely unresponsive. Please help.

Upvotes: 8

Views: 8905

Answers (1)

gabrielgiussi
gabrielgiussi

Reputation: 9575

With session.timeout.ms you only control the timeouts due to heartbeats, this means that has passed session.timeout.ms milliseconds since the last heartbeat and the cluster declares you as a dead node and triggers a rebalance.

Before KIP-62 the heartbeat was sent within the poll but now is moved to a specific background thread to avoid being evicted from the cluster if you were taking more time than session.timeout.ms to call another poll(). Separating the heartbeat to a specific thread decouples the processing from telling the cluster that you are up and running, but this introduced the risk of "livelock" situations in which the process is alive, but is not making progress, so besides making the heartbeat independent of the poll a new timeout was introduced to ensure that the consumer was alive and making progress. The documentation says these about the implementation pre KIP-62:

As long as the consumer is sending heartbeats, it basically holds a lock on the partitions it was assigned. If the process becomes defunct in such a way that it cannot make progress but is nevertheless continuing to send heartbeats, then no other member in the group will be able to take over the partitions, which causes increasing lag. The fact that heartbeating and processing is all done in the same thread, however, guarantees that consumers must make progress to keep their assignment. Any stall which affects processing also affects heartbeats.

The changes introduced by the KIP-62 includes:

Decoupling the processing timeout: We propose to introduce a separate locally enforced timeout for record processing and a background thread to keep the session active until this timeout expires. We call this new timeout as the "process timeout" and expose it in the consumer's configuration as max.poll.interval.ms. This config sets the maximum delay between client calls to poll()

From the logs you posted I think you may be in this situation, your app is taking more time than max.poll.interval.ms (5 min by default) to process the 200 polled records. If you are in this scenario you could only reduce even more the max.poll.records or increase the max.poll.interval.ms.

PD:

The max.poll.interval.ms configuration that appears on your log is from (at least) kafka 0.10.1.0 so I assume you make a little mistake there.

Update

Correct me if I understood you wrong but in your last comment you were saying that you are creating 25 consumers (e.g. 25 org.apache.kafka.clients.consumer.KafkaConsumer if you were using java) and suscribing them to N different topics but using the same group.id. If this is correct, you will see rebalacing each time a KafkaConsumer is started or stopped because it will send a JoinGroup or LeaveGroup message (see the corresponding kafka protocol) that contains group.id and member.id (the member.id is not the host so two consumers created in the same process will still have different ids). Note that these message doesn't contain topic subscription information (although that information should be in the brokers but kafka doesn't use it for rebalancing). So each time the cluster receives a JoinGroup or a LeaveGroup for group.id X, it will trigger a rebalance for all consumers with the same group.id X.

If you start 25 consumers with the same group.id you will see rebalancing until the last consumer is created and the corresponding rebalancing ends (if you continue seeing this you may be stopping consumers).

I had this issue a couple months ago.

If we have two KafkaConsumer using the same group.id (running in the same process or in two different processes) and one of them is closed, it triggers a rebalance in the other KafkaConsumer even if they were subscribed to different topics. I suppose that brokers must be taking into account only the group.id for a rebalance and not the subscribed topics corresponding to the pair (group_id,member_id) of the LeaveGroupRequest but I'm wondering if this is the expected behavior or it's something that should be improved? I guess that is probably the first option to avoid a more complex rebalance in the broker and considering that the solution is very simple, i.e. just use different group ids for different KafkaConsumer that subscribe to different topics even if they are running in the same process.


When rebalance occurs we see duplicate messages coming

This is the expeceted behaviour, one consumer consumes the message but before commiting the offset a rebalance was triggered and the commit fails. When the rebalance finished the process that will have that topic assignment will consume the message again (until commit success).

I segregated into two groups, now suddenly problem has disappeared since past 2 hours.

You hit the nail on the head here, but if you don't want to see any (avoidable) rebalancing you should use a different group.id for each topic.

Here is a great talk about different rebalancing scenarios.

Upvotes: 6

Related Questions