Zach Pate
Zach Pate

Reputation: 41

Kafka Streams - Failed to Rebalance Error

I have a basic Kafka Streams application that reads from an in_topic, performs a rolling aggregate, and performs a join to publish to an out_topic. This has been running fine for weeks, but it crashed this morning and will no longer start. I do not think it has anything to do with the code. The log prior to the error are:

2019-01-21 17:46:32,803 localhost org.apache.kafka.clients.producer.KafkaProducer: [Producer clientId=rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1-0_0-producer, transactionalId=rtt-healthscore-stream-0_0] Instantiated a transactional producer.
2019-01-21 17:46:32,803 localhost org.apache.kafka.clients.producer.KafkaProducer: [Producer clientId=rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1-0_0-producer, transactionalId=rtt-healthscore-stream-0_0] Overriding the default acks to all since idempotence is enabled.
2019-01-21 17:46:32,818 localhost org.apache.kafka.common.utils.AppInfoParser: Kafka version : 2.0.0
2019-01-21 17:46:32,818 localhost org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 3402a8361b734732
2019-01-21 17:46:32,832 localhost org.apache.kafka.clients.producer.internals.TransactionManager: [Producer clientId=rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1-0_0-producer, transactionalId=rtt-healthscore-stream-0_0] ProducerId set to -1 with epoch -1
2019-01-21 17:47:32,833 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Error caught during partition assignment, will abort the current process and re-throw at the end of rebalance: {}
org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
2019-01-21 17:47:32,843 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] partition assignment took 60062 ms.
    current active tasks: []
    current standby tasks: []
    previous active tasks: []

2019-01-21 17:47:32,845 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
2019-01-21 17:47:32,845 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Shutting down
2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.KafkaStreams: stream-client [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804] State transition from REBALANCING to ERROR
2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.KafkaStreams: stream-client [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804] All stream threads have died. The instance will be in error state and should be closed.
2019-01-21 17:47:32,860 localhost org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Shutdown complete
Exception in thread "rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [rtt-healthscore-stream-7d679951-913b-4976-a43e-0b437c22c804-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:870)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:810)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.

None of the kafka settings/configs have changed, and all of the brokers are available. My Kafka version is 2.0. I am able to read from the in_topic from the console-consumer, therefore everything prior to this application is fine. All help is appreciated.

Upvotes: 3

Views: 6771

Answers (3)

Arun
Arun

Reputation: 1085

It's resolved after upgrade
https://kafka.apache.org/25/documentation/streams/developer-guide/write-streams.html

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>
    <!-- Optionally include Kafka Streams DSL for Scala for Scala 2.12 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-scala_2.12</artifactId>
        <version>2.5.0</version>
    </dependency>

Upvotes: 0

Thomas Dickinson
Thomas Dickinson

Reputation: 15

We also got these error after an upgrade to 2.1 (and I think also when previously we upgraded to earlier versions.)

We run in a kubernetes environment where after a rolling upgrade, brokers may change IP address. From the broker log:

[2019-02-20 02:20:20,085] WARN [TransactionCoordinator id=1001] Connection 
to node 0 (khaki-joey-kafka-0.khaki-joey-kafka-headless.hyperspace-dev/10.233.124.181:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2019-02-20 02:20:57,205] WARN [TransactionCoordinator id=1001] Connection to node 1 (khaki-joey-kafka-1.khaki-joey-kafka-headless.hyperspace-dev/10.233.122.67:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

I can see the transaction coordinator is still using stale IP addresses for the 2 brokers that were restarted after itself (a day after the upgrade.)

Possible options:

  • As this answer says, switch off Exactly Once for your streamer. It then doesn't use transactions and all seems to work ok. Not helpful if you require EOS or some other client code requires transactions.
  • restart any brokers that are reporting warnings to force them to re-resolve the IP address. They would need to be restarted in a way that they don't change IP address themselves. Not usually possible in kubernetes.

Defect raised Issue KAFKA-7958 - Transactions are broken with kubernetes hosted brokers

Update 2017-02-20 This may have been resolved in Kafka 2.1.1 (Confluent 5.1.2) released today. See the linked issue.

Upvotes: 1

Zheng Lu
Zheng Lu

Reputation: 71

Our project has the same timeout failure after we upgrade to Kafka 2.1, and we don't know the reason yet.

Our temporary work around is to disable the exactly_once config which skips the initializing transactional state.

Upvotes: 7

Related Questions