Vojtěch
Vojtěch

Reputation: 12416

Kafka keeps rebalancing consumers

We have 10 consumers in a group listening for a topic. What is happening very often is to see the consumers being rebalanced very often (which completely stops the consumer process for some time).

# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092  --describe  --group ParserKafkaPipeline | grep -e ParserBody | sort
ParserBodyToParse 0          99              99              0               consumer-1-f29b7eb7-b871-477c-af52-446fbf4b0496  /10.12.18.58    consumer-1
ParserBodyToParse 1          97              97              0               consumer-10-6639ee02-8e68-40e6-aca1-eabd89bf828e /10.12.18.58    consumer-10
ParserBodyToParse 2          97              97              0               consumer-11-c712db8b-0396-4388-9e3a-e8e342355547 /10.12.18.58    consumer-11
ParserBodyToParse 3          97              98              1               consumer-12-0cc6fe12-d640-4344-91c0-f15e63c20cca /10.12.18.58    consumer-12
ParserBodyToParse 4          97              98              1               consumer-13-b904a958-141d-412e-83ea-950cd51e25e0 /10.12.18.58    consumer-13
ParserBodyToParse 5          97              98              1               consumer-14-7c70ba88-8b8c-4fad-b15b-cf7692a4b9ce /10.12.18.58    consumer-14
ParserBodyToParse 6          98              98              0               consumer-15-f0983c3d-8704-4127-808d-ec8b6b847008 /10.12.18.58    consumer-15
ParserBodyToParse 7          97              97              0               consumer-18-de5d20dd-217c-4db2-9b39-e2fdbca386e9 /10.12.18.58    consumer-18
ParserBodyToParse 8          98              98              0               consumer-5-bdeaf30a-d2bf-4aec-86ea-9c35a7acfe21  /10.12.18.58    consumer-5
ParserBodyToParse 9          98              98              0               consumer-9-4de1bf17-9474-4bd4-ae61-4ab254f52863  /10.12.18.58    consumer-9

# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092  --describe  --group ParserKafkaPipeline | grep -e ParserBody | sort
Warning: Consumer group 'ParserKafkaPipeline' is rebalancing.
ParserBodyToParse 0          99              99              0               -               -               -
ParserBodyToParse 1          99              99              0               -               -               -
ParserBodyToParse 2          99              99              0               -               -               -
ParserBodyToParse 3          99              100             1               -               -               -
ParserBodyToParse 4          99              100             1               -               -               -
ParserBodyToParse 5          99              100             1               -               -               -
ParserBodyToParse 6          100             100             0               -               -               -
ParserBodyToParse 7          99              99              0               -               -               -
ParserBodyToParse 8          100             100             0               -               -               -
ParserBodyToParse 9          100             100             0               -               -               -

Notice the warning in the second call above.

Consuming these messages might take a long time, but it shouldn't take more than two minutes. I checked that the limit on consumer.poll is 5 minutes, which shouldn't be an issue. Are there some logs to check what exactly is happening?

UPDATE:

We use Kafka 2.2.1 and Java consumer. We didn't change the default value of max.session and max.heartbeat. The consumer is basically waiting for IO from other service, so it is not using any CPU – that is why I expect the heartbeat should be working correctly.

Our consumer code is following:

    inline fun <reified T : Any> consume(
            topic: KafkaTopic,
            groupId: String,
            batchSize: Int = 50,
            crossinline consume: (key: String?, value: T) -> (Unit)
    ) = thread {
        val consumerProperties = Properties()
        consumerProperties.putAll(properties)
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize)

        val consumer = KafkaConsumer<String?, ByteArray>(consumerProperties)

        consumer.subscribe(listOf(topic.toString()))

        while (true) try {
            val records = consumer.poll(Duration.ofMinutes(pollDurationMinutes))
            log.debug("Topic $topic consumed by group $groupId: ${records.count()} records.")
            records.forEach { record -> consumeRecord(record, topic, consume) }
        } catch (e: Exception) {
            log.fatal("Couldn't consume records: ${e.message}.", e)
            // sleep to prevent logging hell when connection failure
            Thread.sleep(1000)
        }
    }

Upvotes: 17

Views: 18787

Answers (3)

JayPatel
JayPatel

Reputation: 141

For anyone who encounters this even after they feel processing of records is not the bottleneck:

We recently encountered a nasty bug in Kafka Connect Runtime which will keep heartbeat threads and spin up more Kafka Connect Tasks with same thread name (Essentially, not killing older task threads and heartbeat threads)

Following bugs were encountered in version 2.3.1 and few other versions as mentioned in the JIRA.

https://issues.apache.org/jira/browse/KAFKA-9841

https://issues.apache.org/jira/browse/KAFKA-10574

https://issues.apache.org/jira/browse/KAFKA-9184

Also happened in Confluent Platform version 5.3.1, so please upgrade your kafka connect runtime and connect-api to latest versions if possible.

Upvotes: 2

Chris
Chris

Reputation: 1804

I think that the first part of Giorgos answer is correct, up to ".....processing the batch for a long time" but the configuration advice is for a different problem.

There are two causes of a rebalance, too long between polls or too long between heartbeats. The logs should tell you which has caused rebalance, but it is usually the former.

If the problem is heartbeat then the advised configuration changes may help, and/or session.timeout.ms. The heartbeat runs in a separate thread and allows the group to quickly determine if a consumer application has died.

If the problem is too long between polls and you can't speed up your processing then you need to increase the allowed gap between calling poll, or reduce the number of records you handle on each poll. The relevant properties are max.poll.interval (default 5 minutes) or max.poll.records (default 500)

Upvotes: 9

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39790

Frequent rebalances are usually caused because it is taking too long for the consumer to process batches. This happens because the consumer is processing the batch for a long time (and heartbeats are not being sent) and therefore the brokers think that consumer was lost and they start re-balancing.

I would either suggest to create smaller batches by reducing the value of max.partition.fetch.bytes or extend/increase heartbeat intervals by increasing the value of heartbeat.interval.ms.

Upvotes: 11

Related Questions