rajesh kumar
rajesh kumar

Reputation: 121

Kafka Streams custom header on producer

I have multiple events in a topic and I am trying to process in these steps:

  1. Filter the events based on header value
  2. Apply deserialiser
  3. Group by key
  4. Aggregate to produce a new KTable
  5. New KTable will be streamed to the same topic as a new Event with a new header.

I am able to access the header using transformValues but not sure how to inject new header values when doing toStream.

streamsBuilder.stream("my-topic")
.transformValues(new Transformer())//access headers here n filter few events
.groupByKey(Serialized.with(Serdes.String(),null)
.aggregate(()->my avro object initialization,(key,value,aggregate)->newValue(Value,aggregate),Materialized.as("my-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.ByteArray())
.mapValues((key,value)->convert to bytes).toStream()

NB: I am new to KStream.

Upvotes: 1

Views: 2232

Answers (1)

Nishu Tayal
Nishu Tayal

Reputation: 20840

You can use Processor API to add custom headers. Implement process method in the same way as you did to access headers.

new Processor() { 
    ......
   @override
   public void process(String key, String value) {
       // add a header to the elements
       context().headers().add.("key", "key");
   }
   ...
}

Upvotes: 1

Related Questions