Venkata Madhu
Venkata Madhu

Reputation: 113

Issue of Kafka Balancing at high load

Using kafka version 2.11-0.11.0.3 to publish 10,000 messages (total size of all messages are 10MB), there will be 2 consumers (with same group-id) to consume the message as a parallel processing. While consuming, same message was consumed by both the consumers.

Below errors/warning were throws by kafka

WARN: This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

INFO: Attempt to heartbeat failed since group is rebalancing

INFO: Sending LeaveGroup request to coordinator

WARN: Synchronous auto-commit of offsets {ingest-data-1=OffsetAndMetadata{offset=5506, leaderEpoch=null, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

Below configurations were provided to kafka

server.properties

max.poll.interval.ms=30000
group.initial.rebalance.delay.ms=0
group.max.session.timeout.ms=120000
group.min.session.timeout.ms=6000

consumer.properties

session.timeout.ms=30000 
request.timeout.ms=40000

What should have changed to resolve the multiple consumptions?

Upvotes: 1

Views: 11905

Answers (2)

LIU YUE
LIU YUE

Reputation: 1987

it's straightforward, the problem is your config:

max.poll.interval.ms=30000

it's just 30 seconds, means your consumer have to finish processing each batch of messages(configured by max.poll.records default is 500) in 30 seconds, but obviously the error/warn told you your processing time is over 30 seconds, what you should do is lower max.poll.records or increase max.poll.interval.ms or do both;

plus, I would like to share one more scenario that I encountered: my program customized a ConsumerRebalanceListener, when the first time poll call triggered the rebalance, my program will do a lot of initialize jobs, some of the jobs involves interaction with kafka meta data which takes longer time than 5mins or so, and then it'll give me the same error that consumer leave the group and then triggered another rebalance. If you're interested in it, you can find it here

Upvotes: 1

pgras
pgras

Reputation: 12770

Are your consumers in the same group? If yes you will have multiple consumption if a consumer leaves/dies/timeouts without having committed some messages it has processed.

If all your messages are consumed by both consumers you probably have not set the same group id for them.

More info:

So you have set the same group id for all consumers, good. You are in the situation where the cluster/broker thinks that a consumer died and therefore rebalances the load to another one. This other one will start consuming where the last commit was done.

So lets say consumer C_A read offsets up to 100 from partition P_1 then processed them then committed '100' then read offsets up to 200 then processed them but could not commit because the broker considered C_A as dead.

The broker reassigns partition P_1 to consumer C_B which will start from the last commit for the group, which is 100, will read up to 200, process and commit 200.

So your question is how to avoid that the consumer is considered as dead (I assume it is not dead)?

The answer is already in the yellow WARN message in your question: you can tell your consumer to consume less messages (max.poll.records) in one poll to reduce the processing time between two polls to the broker AND/OR you can increase the max.poll.interval.ms telling the broker to wait longer before considering your consumer as dead...

Upvotes: 1

Related Questions