Reputation: 312
I have a Kafka Streams application consuming from and producing to a Kafka cluster with 3 brokers and a replication factor of 3. Other than the consumer offset topics (50 partitions), all other topics have only one partition each.
When the brokers attempt a preferred replica election, the Streams app (which is running on a completely different instance than the brokers) fails with the error:
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
...
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
Is it normal that the Streams app attempts to be the leader for the partition, given that it's running on a server that's not part of the Kafka cluster?
I can reproduce this behaviour on demand by:
bin/kafka-preferred-replica-election.sh --zookeeper localhost
My issue seems to be similar to this reported failure, so I'm wondering if this is a new Kafka Streams bug. My full stack trace is literally exactly the same as the gist linked in the reported failure (here).
Another potentially interesting detail is that during the leader election, I get these messages in the controller.log
of the broker:
[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
I initially thought this connection error was to blame, but after the leader election crashes the Streams app, if I restart the Streams app, it works normally until the next election, without me touching the brokers at all.
All servers (3 Kafka brokers and the Streams app) are running on EC2 instances.
Upvotes: 10
Views: 3525
Reputation: 244
This is now fixed in 0.10.2.1. If you can't pick that up, make sure you have these two parameters set as follows in your streams config:
final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
Upvotes: 10