Reputation: 51
I need a streamer that needs to send multiple messages to the same topic but with different kafka headers.
I am able to send the message like below with splitting with ~ , but all messages go with the same headers.
inputStream
.transformValues(()->new Transformation())
.flatMapValues(value->Arrays.asList(value.split("~")))
.split()
.branch(
(key,value)->key.startsWith("ERR"),
Branched.withConsumer(ks -> ks.to(errorTopic)))
.defaultBranch(Branched.withConsumer(ks -> ks.to(outboundTopic)));
Upvotes: 0
Views: 437
Reputation: 62350
If you want to modify headers, you would need to use process()
step.
stream.process(() -> new MyProcessor());
MyProcessor implements api.Processor<...> {
public void process(Record record) {
// access headers via `record.headers()`
// modify with `withHeaders(...)`
context.forward(record); // send result record downstream
}
}
Upvotes: 1