vineet gandhi
vineet gandhi

Reputation: 87

Kafka streams headers support

In our application the producer is sending different data types and the it may happen that a partition can have different datatype objects as we didn't want to partition based on datatype.

In kafka Streams I was trying to use headers. Producer is adding header to the BytesObject and pushing the data to kafka.

Header are say, a particular dataType(customObject). Now based of header I want to parse deserialize the BytesObject received in kafka streams but I am bounded by using processorInterface where I have to pass the actual deserializer

Is there any way I don't have to specify the deserialize beforehand then based on header in processorContext for a record received I can deserialize the Objects

public class StreamHeaderProcessor extends AbstractProcessor<String, Bytes>{

    @Override
    public void process(String key, Bytes value) {
        Iterator<Header> it = context().headers().iterator();
        while (it.hasNext()) {
            Header head = it.next();
            if (head.key().equals("dataType")) {
                String headerValue = new String(head.value());
                if (headerValue.equals("X")) {

                } else if(headerValue.equals("Y")) {

                }
            }
        }
    }
}

Upvotes: 1

Views: 2849

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

If you don's set Serdes in StreamsConfig and don't set Serdes on builder.stream(..., Consumed.with(/*Serdes*/)) Kafka Streams will use ByteArraySerde by default and thus key and value is copied into byte[] arrays as data types. (Similar for using Processor API, and don't set a Serde on topology.addSource(...).)

Thus, you can apply a Processor or Transformer on the data stream, inspect the header and call the corresponding deserializer in your own code. You need to know all possible data type in advance.

public class MyProcessor implements Processor {
    // add corresponding deserializers for all expected types (eg, String)
    private StringDeserializer stringDeserializer = new StringDeserializer();

    // other methods omitted

    void process(byte[] key, byte[] value) {
        // inspect header
        if (header.equals("StringType") {
            // get `context` via `init()` method
            String stringValue = stringDeserializer.deserialize(context.topic(), value);
            // similar for `key`

            // apply processing logic for String type
        }
    }

}

Upvotes: 2

Related Questions