Reputation: 121
I have multiple events in a topic and I am trying to process in these steps:
I am able to access the header using transformValues but not sure how to inject new header values when doing toStream.
streamsBuilder.stream("my-topic")
.transformValues(new Transformer())//access headers here n filter few events
.groupByKey(Serialized.with(Serdes.String(),null)
.aggregate(()->my avro object initialization,(key,value,aggregate)->newValue(Value,aggregate),Materialized.as("my-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.ByteArray())
.mapValues((key,value)->convert to bytes).toStream()
NB: I am new to KStream.
Upvotes: 1
Views: 2232
Reputation: 20840
You can use Processor API to add custom headers. Implement process method in the same way as you did to access headers.
new Processor() {
......
@override
public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "key");
}
...
}
Upvotes: 1