Reputation: 23
I'm trying to read a Kafka topic, do some processing based on that, and store the results in another topic.
My code looks like below:
builder
.stream(settings.Streams.inputTopic)
.mapValues[Seq[Product]]((e: EventRecord) ⇒ fx(e))
// Something needs to be done here...
.to(settings.Streams.outputTopic)
The fx(e)
function does some processing and returns a Seq[Product]
. I'd like to store all products as a separate entry in the topic. Problem is that the messages read from the topic contain multiple products, hence the return value for fx(e)
.
Is it possible to embed this behaviour in the stream?
Upvotes: 1
Views: 192
Reputation: 19527
Use flatMapValues
instead of mapValues
:
import scala.collection.JavaConverters.asJavaIterableConverter
builder
.stream(settings.Streams.inputTopic)
.flatMapValues(e => fx(e).toIterable.asJava)
.to(settings.Streams.outputTopic)
Upvotes: 3