Hossein Mayboudi
Hossein Mayboudi

Reputation: 505

Kafka Protobuf Console Consumer Serialization Exception

I have a problem regarding the #protobuf #serialization which occurs in #nodejs and #apache #kafka run by #confluent platform all in one community.

I serialize the data with google protobuf or protobufjs and send it to kafka with kafkajs. However, when I submit data, kafka-protobuf-console-consumer gives me a serialization exception. Please check the source code and help me out. https://github.com/smhmayboudi/kafka-protobuf-console

org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 53
Caused by: java.lang.IllegalArgumentException: Invalid message indexes: io.confluent.kafka.schemaregistry.protobuf.MessageIndexes@59d77850
    at io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema.toMessageName(ProtobufSchema.java:903)
    at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:119)
    at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:98)
    at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter$ProtobufMessageDeserializer.deserialize(ProtobufMessageFormatter.java:130)
    at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter$ProtobufMessageDeserializer.deserialize(ProtobufMessageFormatter.java:104)
    at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter.writeTo(ProtobufMessageFormatter.java:88)
    at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:173)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:118)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

Upvotes: 1

Views: 1995

Answers (1)

Hossein Mayboudi
Hossein Mayboudi

Reputation: 505

I find the solution by discussing in confluent slack with help of Gerard Klijs. Thanks, to Gerard Klijs and Ricardo Ferreira. Plus I update the git repo.

The serializer formatting between java and other language is different. So you have to follow this formating style: [Magic Byte] + [Schema ID] + [Message Index Data] + [Message Payload], which Message Index Data is zero.

source: https://riferrei.com/2020/07/09/data-sharing-between-java-go-using-kafka-and-protobuf/

Upvotes: 3

Related Questions