Reputation: 113
Using kafka version 2.11-0.11.0.3 to publish 10,000 messages (total size of all messages are 10MB), there will be 2 consumers (with same group-id) to consume the message as a parallel processing. While consuming, same message was consumed by both the consumers.
Below errors/warning were throws by kafka
WARN: This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
INFO: Attempt to heartbeat failed since group is rebalancing
INFO: Sending LeaveGroup request to coordinator
WARN: Synchronous auto-commit of offsets {ingest-data-1=OffsetAndMetadata{offset=5506, leaderEpoch=null, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
Below configurations were provided to kafka
server.properties
max.poll.interval.ms=30000
group.initial.rebalance.delay.ms=0
group.max.session.timeout.ms=120000
group.min.session.timeout.ms=6000
consumer.properties
session.timeout.ms=30000
request.timeout.ms=40000
What should have changed to resolve the multiple consumptions?
Upvotes: 1
Views: 11905
Reputation: 1987
it's straightforward, the problem is your config:
max.poll.interval.ms=30000
it's just 30 seconds, means your consumer have to finish processing each batch of messages(configured by max.poll.records default is 500) in 30 seconds, but obviously the error/warn told you your processing time is over 30 seconds, what you should do is lower max.poll.records or increase max.poll.interval.ms or do both;
plus, I would like to share one more scenario that I encountered: my program customized a ConsumerRebalanceListener, when the first time poll call triggered the rebalance, my program will do a lot of initialize jobs, some of the jobs involves interaction with kafka meta data which takes longer time than 5mins or so, and then it'll give me the same error that consumer leave the group and then triggered another rebalance. If you're interested in it, you can find it here
Upvotes: 1
Reputation: 12770
Are your consumers in the same group? If yes you will have multiple consumption if a consumer leaves/dies/timeouts without having committed some messages it has processed.
If all your messages are consumed by both consumers you probably have not set the same group id for them.
More info:
So you have set the same group id for all consumers, good. You are in the situation where the cluster/broker thinks that a consumer died and therefore rebalances the load to another one. This other one will start consuming where the last commit was done.
So lets say consumer C_A read offsets up to 100 from partition P_1 then processed them then committed '100' then read offsets up to 200 then processed them but could not commit because the broker considered C_A as dead.
The broker reassigns partition P_1 to consumer C_B which will start from the last commit for the group, which is 100, will read up to 200, process and commit 200.
So your question is how to avoid that the consumer is considered as dead (I assume it is not dead)?
The answer is already in the yellow WARN message in your question: you can tell your consumer to consume less messages (max.poll.records) in one poll to reduce the processing time between two polls to the broker AND/OR you can increase the max.poll.interval.ms telling the broker to wait longer before considering your consumer as dead...
Upvotes: 1