Reputation: 87
In our application the producer is sending different data types and the it may happen that a partition can have different datatype objects as we didn't want to partition based on datatype.
In kafka Streams I was trying to use headers. Producer is adding header to the BytesObject and pushing the data to kafka.
Header are say, a particular dataType(customObject). Now based of header I want to parse deserialize the BytesObject received in kafka streams but I am bounded by using processorInterface where I have to pass the actual deserializer
Is there any way I don't have to specify the deserialize beforehand then based on header in processorContext for a record received I can deserialize the Objects
public class StreamHeaderProcessor extends AbstractProcessor<String, Bytes>{
@Override
public void process(String key, Bytes value) {
Iterator<Header> it = context().headers().iterator();
while (it.hasNext()) {
Header head = it.next();
if (head.key().equals("dataType")) {
String headerValue = new String(head.value());
if (headerValue.equals("X")) {
} else if(headerValue.equals("Y")) {
}
}
}
}
}
Upvotes: 1
Views: 2849
Reputation: 62330
If you don's set Serdes in StreamsConfig
and don't set Serdes on builder.stream(..., Consumed.with(/*Serdes*/))
Kafka Streams will use ByteArraySerde
by default and thus key and value is copied into byte[]
arrays as data types. (Similar for using Processor API, and don't set a Serde on topology.addSource(...)
.)
Thus, you can apply a Processor
or Transformer
on the data stream, inspect the header and call the corresponding deserializer in your own code. You need to know all possible data type in advance.
public class MyProcessor implements Processor {
// add corresponding deserializers for all expected types (eg, String)
private StringDeserializer stringDeserializer = new StringDeserializer();
// other methods omitted
void process(byte[] key, byte[] value) {
// inspect header
if (header.equals("StringType") {
// get `context` via `init()` method
String stringValue = stringDeserializer.deserialize(context.topic(), value);
// similar for `key`
// apply processing logic for String type
}
}
}
Upvotes: 2