Reputation: 508
I am attempting to test out a producer writing messages to a topic on a kafka cluster using the Golang client. This works fine writing to a topic on a local cluster, I just copied and pasted the example code from their github repo.
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"localhost"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "test"
for _, word := range []string{"test message"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
}
I receive the message on my console-consumer no issues.
I then try to do the same thing, just using my remote kafka cluster topic (note I also tried without the ports in the strings):
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"HOSTNAME.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092"})
it prints the following error:
Delivery failed: test[0]@end(Broker: Not enough in-sync replicas)
The console producer has no issues though:
./bin/kafka-console-producer.sh --broker-list HOSTNAME.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092 --topic test
>proving that this works
The console-consumer receives it:
bin/kafka-console-consumer.sh --bootstrap-server HOSTNAME.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092 --topic test --from-beginning
proving that this works
Last thing I did was check to see how many In-Sync replicas there were for that topic. If I am reading this correctly, the min should be 2 and there are 3.
./bin/kafka-topics.sh --describe --bootstrap-server HOSTNAME1.amazonaws.com:9092,HOSTNAME2.amazonaws.com:9092,HOSTNAME3.amazonaws.com:9092 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:min.insync.replicas=2,flush.ms=10000,segment.bytes=1073741824,retention.ms=86400000,flush.messages=9223372036854775807,max.message.bytes=1000012,min.cleanable.dirty.ratio=0.5,unclean.leader.election.enable=true,retention.bytes=-1,delete.retention.ms=86400000,segment.ms=604800000
Topic: test Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Any ideas of what else I could look into?
Upvotes: 2
Views: 11284
Reputation: 33
Or if you're using AWS's MSK, this could arise when the EBS storage per broker is completely used for one of the broker and the possible way to overcome is to increase it's storage.
Upvotes: 2
Reputation: 191874
You have min.insync.replicas=2
, but the topic only has one replica.
If you have request.required.acks=all
(which is the default), then the producer will fail because it cannot replicate what you've produced to the leader broker to the minimum set of required replicas
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties
I believe console producer only sets that property to just 1
there are 3
There's actually only one. That's broker ID 3. You'd see a total of three separate numbers there as ISR if there were actually three replicas
Upvotes: 9