Reputation: 505
I have a problem regarding the #protobuf #serialization which occurs in #nodejs and #apache #kafka run by #confluent platform all in one community.
I serialize the data with google protobuf or protobufjs and send it to kafka with kafkajs. However, when I submit data, kafka-protobuf-console-consumer gives me a serialization exception. Please check the source code and help me out. https://github.com/smhmayboudi/kafka-protobuf-console
org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 53
Caused by: java.lang.IllegalArgumentException: Invalid message indexes: io.confluent.kafka.schemaregistry.protobuf.MessageIndexes@59d77850
at io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema.toMessageName(ProtobufSchema.java:903)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:119)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:98)
at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter$ProtobufMessageDeserializer.deserialize(ProtobufMessageFormatter.java:130)
at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter$ProtobufMessageDeserializer.deserialize(ProtobufMessageFormatter.java:104)
at io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter.writeTo(ProtobufMessageFormatter.java:88)
at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:173)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:118)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Upvotes: 1
Views: 1995
Reputation: 505
I find the solution by discussing in confluent slack with help of Gerard Klijs. Thanks, to Gerard Klijs and Ricardo Ferreira. Plus I update the git repo.
The serializer formatting between java and other language is different. So you have to follow this formating style: [Magic Byte] + [Schema ID] + [Message Index Data] + [Message Payload], which Message Index Data is zero.
source: https://riferrei.com/2020/07/09/data-sharing-between-java-go-using-kafka-and-protobuf/
Upvotes: 3