Reputation: 312
Our library has dataprocessors (Scala code) that uses the typical kafka-stream:
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
The reason for moving to kafka-stream-scala
is the weird return types whenever I use functions like groupBy, selectByKey, etc.
(builder: StreamsBuilder, stream: KStream[String, Event]) =>
val wgCreatedStream = stream.groupBy((_,v) =>
v.payload match {
case x:WorkgroupCreated => x.id //this is a String
})
Take this code for example. StreamsBuilder
results to KGroupedStream[Nothing,Event]
When I used these imports:
import org.apache.kafka.streams.scala.{ByteArrayWindowStore, StreamsBuilder}
import org.apache.kafka.streams.scala.kstream.{Grouped, KStream, Materialized}
import org.apache.kafka.streams.scala.Serdes.{String,Long}
return type finally changed to KGroupedStream[String,Event]
What I'm really hoping for:
To use kafka-stream-scala
without refactoring our dataprocessor
If YES, that's fantastic, esp. if there's examples!
If NO... it'll be a painful journey. :( (but thanks anyway)
Upvotes: 2
Views: 185
Reputation: 312
Okay, after a chat with my workmate, I immediately found an answer. -___-
from org.apache.kafka.streams.scala.ImplicitConversions
,
there's a wrapKStream()
that converts
streams.kstream.KStream
to streams.scala.kstream.KStream
As for the other way around, just call the inner
method directly. Same goes for KTable.
Upvotes: 4