Reputation: 11
I'm verifying the Protobuf schema using the Kafka schema registry. The problem is that even though I put in the correct schema, I still get the error Broker: Broker failed to verify record.
The schema i set and the data i send are as follows.
syntax = "proto3";
option go_package = "somepackage";
package mypackage;
message Message {
string code = 1;
string description = 2;
Additionally, I am sending data using go and the code for the data is as follows.
func main() {
topic := "mytopic"
conf := ProducerConfig()
producer, err := kafka.NewProducer(&conf)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
defer producer.Close()
testMsg := mypackage.Message{Code: "1001", Description: "This is an error Message"}
// serialize the protobuf message
payload, err := proto.Marshal(&testMsg)
if err != nil {
log.Fatalf("Error occurred while marshalling the protobuf message : %v", err)
schemaClient := srclient.CreateSchemaRegistryClient("schemaRegistryUrl")
schemaClient.SetCredentials("username", "password")
// fetch or register schema
subject := topic + "-value"
schema, err := schemaClient.GetLatestSchema(subject)
if err != nil {
log.Fatalf("failed to fetche the schema: %v", err)
schemanId := int32(schema.ID())
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, schemanId)
// Optional delivery channel, if not specified the Producer object's
// .Events channel is used.
deliveryChan := make(chan kafka.Event)
err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: buf.Bytes()},
if err != nil {
log.Fatalf("Failed to produce message: %v", err)
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
log.Println("Message Produced successfully")
Upvotes: 1
Views: 32