Sudip Sikdar
Sudip Sikdar

Reputation: 11

Kafka schema registry go Protobuf - Broker: Broker failed to validate record

I'm verifying the Protobuf schema using the Kafka schema registry. The problem is that even though I put in the correct schema, I still get the error Broker: Broker failed to verify record.

The schema i set and the data i send are as follows.

syntax = "proto3";

option go_package = "somepackage";

package mypackage;

message Message {
    string code = 1;
    string description = 2;
}

Additionally, I am sending data using go and the code for the data is as follows.

func main() {
    topic := "mytopic"

    conf := ProducerConfig()

    producer, err := kafka.NewProducer(&conf)

    if err != nil {

        log.Fatalf("Failed to create producer: %v", err)
    }

    defer producer.Close()

    testMsg := mypackage.Message{Code: "1001", Description: "This is an error Message"}

    // serialize the protobuf message

    payload, err := proto.Marshal(&testMsg)

    if err != nil {
        log.Fatalf("Error occurred while marshalling the protobuf message : %v", err)
    }

    schemaClient := srclient.CreateSchemaRegistryClient("schemaRegistryUrl")
    schemaClient.SetCredentials("username", "password")

    // fetch or register schema
    subject := topic + "-value"

    schema, err := schemaClient.GetLatestSchema(subject)

    if err != nil {
        log.Fatalf("failed to fetche the schema: %v", err)

    }

    schemanId := int32(schema.ID())
    buf := new(bytes.Buffer)
    binary.Write(buf, binary.BigEndian, schemanId)
    buf.Write(payload)

    // Optional delivery channel, if not specified the Producer object's
    // .Events channel is used.
    deliveryChan := make(chan kafka.Event)

    err = producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          buf.Bytes()},
        deliveryChan)

    if err != nil {
        log.Fatalf("Failed to produce message: %v", err)
    }

    e := <-deliveryChan
    m := e.(*kafka.Message)

    if m.TopicPartition.Error != nil {
        fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
    } else {
        fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }

    log.Println("Message Produced successfully")

    close(deliveryChan)
}

Upvotes: 1

Views: 28

Answers (0)

Related Questions