hellzone
hellzone

Reputation: 5246

How to create a generic deserializer for bytes in Kafka?

I have a proto file with multiple message types. I want to create a generic deserializer for these messages.

Do I need to send these message's types with Kafka message header so Consumer can deserialize these messages with this type information? Is this a best practice or is there any other solution?

deserialize method example;

public Object deserialize(String topic, Headers headers, byte[] data) {
    if(headers[0].equals("Person")){
        return Person.parseFrom(data);
    } else if....
}

my proto file;

message Person {
    uint64 number = 1;
    string name = 2;
}

message Event {
    string msg = 1;
    code = 2;
}

message Data {
    string inf = 1;
    string desc = 2;
}

....

Upvotes: 1

Views: 1956

Answers (1)

JavaTechnical
JavaTechnical

Reputation: 9357

Do I need to send these message's types with Kafka message header so Consumer can deserialize these messages with this type information?

If your KafkaConsumer consumes messages only from a certain topic with only a certain type (class) of messages, then you can configure the class as such in the deserializer configurations like, for example, value.class, key.class etc in your configuration which you can get using the configure() in Deserializer using configs.get("value.class") or configs.get("key.class") and then store them in the member variables.

void configure(java.util.Map<java.lang.String,?> configs,
               boolean isKey)

In case your topic contains different types of messages or your consumer subscribes to different topics each with different types of messages, then storing the class in the Headers should be appropriate.

Another alternative is to write a wrapper class.

class MessageWrapper {
   private Class messageClass;
   private byte[] messageData;
   ProtobufSchema schema;
}

and then in the data you can de-serialize the MessageWrapper. Here the messageData type could be a Person, Data, or Event and the messageClass should help you in parsing. For example,

mapper.readerFor(messageWrapper.getMessageClass())
   .with(messageWrapper.getSchema())
   .readValue(messageWrapper.getMessageData());

Once you get the object, you can check if it is instanceof Person or Event or Data

You may also look at Generating Protobuf schema from POJO definition and omit the schema field in the MessageWrapper

Snippet

ProtobufMapper mapper = new ProtobufMapper()
ProtobufSchema schemaWrapper = mapper.generateSchemaFor(messageWrapper.getMessageClass())
NativeProtobufSchema nativeProtobufSchema = schemaWrapper.getSource();

String asProtofile = nativeProtobufSchema.toString();

Upvotes: 2

Related Questions