Nandish Kotadia
Nandish Kotadia

Reputation: 461

Kafka Consumer Rebalancing takes too long

I have a Kafka Streams Application which takes data from few topics and joins the data and puts it in another topic.

Kafka Configuration:

5 kafka brokers
Kafka Topics - 15 partitions and 3 replication factor. 

Note: I am running Kafka Streams Applications on the same machines where my Kafka Brokers are running.

Few millions of records are consumed/produced every hour. Whenever I take any kafka broker down, it goes into rebalancing and it takes approx. 30 minutes or sometimes even more for rebalancing.

Anyone has any idea how to solve rebalancing issue in kafka consumer? Also, many times it throws exception while rebalancing.

This is stopping us from going live in production environment with this setup. Any help would be appreciated.

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: ?
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725)

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173)
at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307)
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346)
at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110)

Kafka Streams Config:

bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092,kafka-4:9092,kafka-5:9092
max.poll.records = 100
request.timeout.ms=40000

ConsumerConfig it internally creates is:

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka-1:9092, kafka-2:9092, kafka-3:9092, kafka-4:9092, kafka-5:9092]
    check.crcs = true
    client.id = conversion-live-StreamThread-1-restore-consumer
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = 
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 2147483647
    max.poll.records = 100
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 40000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Upvotes: 6

Views: 18846

Answers (2)

sse
sse

Reputation: 1151

to my experience, first your max.poll.records is too small given your workload: Few millions of records are consumed/produced every hour.

so if max.poll.records is too small say 1, then the rebalancing takes very long. i don't know the reason.

second, make sure the number of partitions of the input topics to you stream app are consistent. e.g. if APP-1 has two input topics A and B. if A has 4 partitions, and B has 2, then rebalancing takes very long. However, if A and B both have 4 partitions event some partitions are idle, then rebalancing time is good. hope it helps

Upvotes: 1

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

I would recommend to configure StandbyTasks via parameter num.standby.replicas=1 (default is 0). This should help to reduce the rebalance time significantly.

Furthermore, I would recommend to upgrade your application to Kafka 0.11. Note, Streams API 0.11 is backward compatible to 0.10.1 and 0.10.2 brokers, thus, you don't need to upgrade your brokers for this. Rebalance behavior was heavily improved in 0.11 and will be further improved in upcoming 1.0 release (cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-167%3A+Add+interface+for+the+state+store+restoration+process), thus, upgrading your application to the latest version is always an improvement for rebalancing.

Upvotes: 5

Related Questions