ronald de Haan
ronald de Haan

Reputation: 23

Extract Seq using Kafka Streams

I'm trying to read a Kafka topic, do some processing based on that, and store the results in another topic.

My code looks like below:

builder
   .stream(settings.Streams.inputTopic)
   .mapValues[Seq[Product]]((e: EventRecord) ⇒ fx(e))
   // Something needs to be done here...
   .to(settings.Streams.outputTopic)

The fx(e) function does some processing and returns a Seq[Product]. I'd like to store all products as a separate entry in the topic. Problem is that the messages read from the topic contain multiple products, hence the return value for fx(e).

Is it possible to embed this behaviour in the stream?

Upvotes: 1

Views: 192

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

Use flatMapValues instead of mapValues:

import scala.collection.JavaConverters.asJavaIterableConverter

builder
  .stream(settings.Streams.inputTopic)
  .flatMapValues(e => fx(e).toIterable.asJava)
  .to(settings.Streams.outputTopic)

Upvotes: 3

Related Questions