Reputation: 1
tl;dr
There are two k8s clusters, a Kafka cluster with three brokers, and a topic with 6 partitions. There is a service, the same version of which is running on both k8s clusters. In one of them, for all the time of observation, everything is fine - k8s_1. In the second one, I get the errors "Received invalid metadata error in product request on partition TOPIC-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException...." k8s_2. Messages are sometimes successfully sent to one node, sometimes to two.
There are several different types of similar services, all connected to the same kafka cluster. All of them work on k8s_1. On k8s_2, all of them break down.
details
Kafka nodes
Distribution of leaders for 6 partitions from offset explorer:
log:
[Producer clientId=producer-1] Nodes with data ready to send: [1.1.1.232:9092 (id: 1 rack: null)]
[Producer clientId=producer-1] Sent produce request to 1: (type=ProduceRequest, acks=-1, timeout=30000, partitionRecords=([PartitionProduceData(index=4, records=MemoryRecords(size=1303, buffer=java.nio.HeapByteBuffer[pos=0 lim=1303 cap=1303]))]), transactionalId=''
[Producer clientId=producer-1] Received produce response from node 1 with correlation id 11119
[Producer clientId=producer-1] Got error produce response with correlation id 11119 on topic-partition TOPIC-4, retrying (2147482932 attempts left). Error: NOT_LEADER_OR_FOLLOWER
[Producer clientId=producer-1] Received invalid metadata error in produce request on partition TOPIC-4 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
[Producer clientId=producer-1] Updating last seen epoch for partition TOPIC-1 from 0 to epoch 0 from new metadata
[Producer clientId=producer-1] Updating last seen epoch for partition TOPIC-2 from 0 to epoch 0 from new metadata
[Producer clientId=producer-1] Updating last seen epoch for partition TOPIC-0 from 0 to epoch 0 from new metadata
[Producer clientId=producer-1] Updating last seen epoch for partition TOPIC-3 from 0 to epoch 0 from new metadata
[Producer clientId=producer-1] Updating last seen epoch for partition TOPIC-5 from 0 to epoch 0 from new metadata
[Producer clientId=producer-1] Updating last seen epoch for partition TOPIC-4 from 0 to epoch 0 from new metadata
[Producer clientId=producer-1] Updated cluster metadata updateVersion 4992 to MetadataSnapshot{clusterId='JBNBUWDLRIOjQHAEqIM2FA', nodes={1=1.1.1.232:9092 (id: 1 rack: null), 2=1.1.1.233:9092 (id: 2 rack: null), 3=1.1.1.234:9092 (id: 3 rack: null)}, partitions=[PartitionMetadata(error=NONE, partition=TOPIC-3, leader=Optional[3], leaderEpoch=Optional[0], replicas=3,1,2, isr=3,1,2, offlineReplicas=), PartitionMetadata(error=NONE, partition=TOPIC-2, leader=Optional[3], leaderEpoch=Optional[0], replicas=3,1,2, isr=3,1,2, offlineReplicas=), PartitionMetadata(error=NONE, partition=TOPIC-5, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,3,1, isr=2,3,1, offlineReplicas=), PartitionMetadata(error=NONE, partition=TOPIC-4, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,2,3, isr=1,2,3, offlineReplicas=), PartitionMetadata(error=NONE, partition=TOPIC-1, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,3,1, isr=2,3,1, offlineReplicas=), PartitionMetadata(error=NONE, partition=TOPIC-0, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,2,3, isr=1,2,3, offlineReplicas=)], controller=1.1.1.234:9092 (id: 3 rack: null)}
[Producer clientId=producer-1] Nodes with data ready to send: [1.1.1.232:9092 (id: 1 rack: null)]
...
[Producer clientId=producer-1] Got error ....
If I understand correctly: We tried to send a message to node 1 (1.1.1.232:9092). We got an error saying we couldn’t send to partition 4 because the leader was incorrect. When we checked the metadata, we saw that the right leader for TOPIC-4 is actually node 1. But even after sending the message to this node, we still got the same error despite the fact that the leader is correct.
Sometimes nodes reconnect, which can change the nodes that the service successfully sends messages to. "In SendResult, you can see the partition numbers that are currently accepting messages. If you send messages to those partitions manually, they are delivered successfully."
new ProducerRecord<>("TOPIC", successPartition, uuid, message);
The issue seems unrelated to the code since it's minimalistic and identical to what’s running successfully on another K8S cluster connected to the same Kafka cluster.
I removed almost everything. Now a message is sent there according to the schedule about once a minute. The code is standard, from the documentation.
var record = new ProducerRecord<>("TOPIC", uuid, message);
SendResult<String, String> result = kafkaTemplate.send(record).get(2000, MILLISECONDS);
Any ideas on what to check in the code, Kubernetes, Kafka, infrastructure...?
Upvotes: 0
Views: 78
Reputation: 1
After disabling istio-proxy for the services connecting to Kafka, everything start working. On a different stand, it all works fine without disabling. Why this is so is not yet clear.
Upvotes: 0