Sajeevan mp
Sajeevan mp

Reputation: 51

Kafka Stream to send Multiple messages with different headers to same topic

I need a streamer that needs to send multiple messages to the same topic but with different kafka headers.

I am able to send the message like below with splitting with ~ , but all messages go with the same headers.

inputStream
  .transformValues(()->new Transformation())
  .flatMapValues(value->Arrays.asList(value.split("~")))
  .split()
    .branch(
      (key,value)->key.startsWith("ERR"),
      Branched.withConsumer(ks -> ks.to(errorTopic)))
    .defaultBranch(Branched.withConsumer(ks -> ks.to(outboundTopic)));
            

Upvotes: 0

Views: 437

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

If you want to modify headers, you would need to use process() step.

stream.process(() -> new MyProcessor());

MyProcessor implements api.Processor<...> {

    public void process(Record record) {
      // access headers via `record.headers()`
      // modify with `withHeaders(...)`

      context.forward(record); // send result record downstream
    }
}

Upvotes: 1

Related Questions