Maciej eM
Maciej eM

Reputation: 61

Kafka Consumer death handling

I have question regarding handling of consumers death due to exceeding the timeout values.

my example configuration:

session.timeout.ms = 10000 (10 seconds)
heartbeat.interval.ms = 2000 (2 seconds)
max.poll.interval.ms = 300000 (5 minutes)

I have 1 topic, 10 partitions, 1 consumer group, 10 consumers (1 partition = 1 consumer).

From my understanding consuming messages in Kafka, very simplified, works as follows:

  1. consumer polls 100 records from topic
  2. a heartbeat signal is sent to broker
  3. processing records in progress
  4. processing records completes
  5. finalize processing (commit, do nothing etc.)
  6. repeat #1-5 in a loop

My question is, what happens if time between heartbeats takes longer than previously configured session.timeout.ms. I understand the part, that if session times out, the broker initializes a re-balance, the consumer which processing took longer than the session.timeout.ms value is marked as dead and a different consumer is assigned/subscribed to that partition.

Okey, but what then...?

  1. Is that long-processing consumer removed/unsubscribed from the topic and my application is left with 9 working consumers? What if all the consumers exceed timeout and are all considered dead, am I left with a running application which does nothing because there are no consumers?
  2. Long-processing consumer finishes processing after re-balancing already took place, does broker initializes re-balance again and consumer is assigned a partition anew? As I understand it continues running #1-5 in a loop and sending a heartbeat to broker initializes also process of adding consumer to the consumers group, from which it was removed after being given dead status, correct?
  3. Application throws some sort of exception indicating that session.timeout.ms was exceeded and the processing is abruptly stopped?
  4. Also what about max.poll.interval.ms property, what if we even exceed that period and consumer X finishes processing after max.poll.interval.ms value? Consumer already exceeded the session.timeout.ms value, it was excluded from consumer group, status set to dead, what difference does it gives us in configuring Kafka consumer?

We have a process which extracts data for processing and this extraction consists of 50+ SQL queries (majority being SELECT's, few UPDATES), they usually go fast but of course all depends on the db load and possible locks etc. and there is a possibility that the processing takes longer than the session's timeout. I do not want to infinitely increase sessions timeout until "I hit the spot". The process is idempotent, if it's repeated X times withing X minutes we do not care.

Upvotes: 3

Views: 3283

Answers (1)

ChristDist
ChristDist

Reputation: 778

Please find the answers.

#1. Yes. If all of your consumer instances are kicked out of the consumer group due to session.timeout, then you will be left with Zero consumer instance, eventually, consumer application is dead unless you restart.

#2. This depends, how you write your consumer code with respect to poll() and consumer record iterations. If you have a proper while(true) and try and catch inside, you consumer will be able to re-join the consumer group after processing that long running record.

#3. You will end up with the commit failed exception:

failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

And again it depends on your code, to auto join into the consumer group.

#4. Answer lies here

session.timeout.ms

The amount of time a consumer can be out of contact with the brokers while still considered alive defaults to 3 seconds. If more than session.timeout.ms passes without the consumer sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group. This property is closely related to heartbeat.interval.ms. heartbeat.interval.ms con‐ trols how frequently the KafkaConsumer poll() method will send a heartbeat to the group coordinator, whereas session.timeout.ms controls how long a consumer can go without sending a heartbeat. Therefore, those two properties are typically modi‐ fied together—heatbeat.interval.ms must be lower than session.timeout.ms, and is usually set to one-third of the timeout value. So if session.timeout.ms is 3 sec‐ onds, heartbeat.interval.ms should be 1 second. Setting session.timeout.ms lower than the default will allow consumer groups to detect and recover from failure sooner, but may also cause unwanted rebalances as a result of consumers taking longer to complete the poll loop or garbage collection. Setting session.timeout.ms higher will reduce the chance of accidental rebalance, but also means it will take longer to detect a real failure.

Upvotes: 2

Related Questions