user2538255
user2538255

Reputation: 313

Kafka Streams: one record to multiple records

Given: I have two topics in Kafka let's say topic A and topic B. The Kafka Stream reads a record from topic A, processes it and produces multiple records (let's say recordA and recordB) corresponding to the consumed record. Now, the question is how can I achieve this using Kafka Streams.

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
        @Override
        public List<Message> apply(final Message message) {
          return consumerRecordHandler.process(message);
        }
    }).*someFunction*()

Here, the record read is Message; After processing it returns a list of Message. How can I divide this list to two producer streams? Any help will be appreciated.

Upvotes: 8

Views: 10919

Answers (2)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

I am not sure if I understand the question correctly, and I also don't understand the answer from @Abhishek :(

If you have an input stream, and you want to get zero, one, or more output records per input records, you would apply a flatMap() or flatMapValues() (depending if you want to modify the key or not).

You are also asking about "How can I divide this list to two producer streams?" If you mean to split one stream into multiple, you can use branch().

For more details, I refer to the docs: https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateless-transformations

Upvotes: 20

Abhishek
Abhishek

Reputation: 1225

What's your key (type) ? I am guessing its not String. After executing the mapValues you'll have this - KStream<K,List<Message>>. If K is not String then someFunction() can be a map which will convert K into String (if its is, you already have the result) and leave the List<Message> (the value) untouched since that's your intended end result

Upvotes: 3

Related Questions