Jitterbug
Jitterbug

Reputation: 312

How to convert the Java api KStream to Scala api KStream?

Our library has dataprocessors (Scala code) that uses the typical kafka-stream:

import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream

The reason for moving to kafka-stream-scala is the weird return types whenever I use functions like groupBy, selectByKey, etc.

(builder: StreamsBuilder, stream: KStream[String, Event]) =>
 val wgCreatedStream = stream.groupBy((_,v) => 
  v.payload match {
   case x:WorkgroupCreated => x.id //this is a String
  })

Take this code for example. StreamsBuilder results to KGroupedStream[Nothing,Event]

When I used these imports:

import org.apache.kafka.streams.scala.{ByteArrayWindowStore, StreamsBuilder}
import org.apache.kafka.streams.scala.kstream.{Grouped, KStream, Materialized}
import org.apache.kafka.streams.scala.Serdes.{String,Long}

return type finally changed to KGroupedStream[String,Event]

What I'm really hoping for: To use kafka-stream-scala without refactoring our dataprocessor

If YES, that's fantastic, esp. if there's examples!
If NO... it'll be a painful journey. :( (but thanks anyway)

Upvotes: 2

Views: 185

Answers (1)

Jitterbug
Jitterbug

Reputation: 312

Okay, after a chat with my workmate, I immediately found an answer. -___-

from org.apache.kafka.streams.scala.ImplicitConversions,
there's a wrapKStream() that converts
streams.kstream.KStream to streams.scala.kstream.KStream

As for the other way around, just call the inner method directly. Same goes for KTable.

Upvotes: 4

Related Questions