Reputation: 5246
I have a proto file with multiple message types. I want to create a generic deserializer for these messages.
Do I need to send these message's types with Kafka message header so Consumer can deserialize these messages with this type information? Is this a best practice or is there any other solution?
deserialize method example;
public Object deserialize(String topic, Headers headers, byte[] data) {
if(headers[0].equals("Person")){
return Person.parseFrom(data);
} else if....
}
my proto file;
message Person {
uint64 number = 1;
string name = 2;
}
message Event {
string msg = 1;
code = 2;
}
message Data {
string inf = 1;
string desc = 2;
}
....
Upvotes: 1
Views: 1956
Reputation: 9357
Do I need to send these message's types with Kafka message header so Consumer can deserialize these messages with this type information?
If your KafkaConsumer
consumes messages only from a certain topic with only a certain type (class) of messages, then you can configure the class as such in the deserializer configurations like, for example, value.class
, key.class
etc in your configuration which you can get using the configure() in Deserializer using configs.get("value.class")
or configs.get("key.class")
and then store them in the member variables.
void configure(java.util.Map<java.lang.String,?> configs,
boolean isKey)
In case your topic contains different types of messages or your consumer subscribes to different topics each with different types of messages, then storing the class in the Headers should be appropriate.
Another alternative is to write a wrapper class.
class MessageWrapper {
private Class messageClass;
private byte[] messageData;
ProtobufSchema schema;
}
and then in the data you can de-serialize the MessageWrapper
. Here the messageData
type could be a Person
, Data
, or Event
and the messageClass
should help you in parsing.
For example,
mapper.readerFor(messageWrapper.getMessageClass())
.with(messageWrapper.getSchema())
.readValue(messageWrapper.getMessageData());
Once you get the object, you can check if it is instanceof
Person
or Event
or Data
You may also look at Generating Protobuf schema from POJO definition and omit the schema
field in the MessageWrapper
Snippet
ProtobufMapper mapper = new ProtobufMapper()
ProtobufSchema schemaWrapper = mapper.generateSchemaFor(messageWrapper.getMessageClass())
NativeProtobufSchema nativeProtobufSchema = schemaWrapper.getSource();
String asProtofile = nativeProtobufSchema.toString();
Upvotes: 2