Reputation: 31
I am using Kafka Streams 1.0, I am reading a topic in a Kstream[String, CustomObject], then I am trying to select a new key that comes from one member of the CustomObject, the code looks like this:
myStream: KStream[String, CustomObject] = builder.stream("topic")
.mapValues {
...
//code to transform json to CustomObject
customObject
}
myStream.selectKey((k,v) => v.id)
.to("outputTopic", Produced.`with`(Serdes.String(),
customObjectSerde))
It gives this error:
Error:(109, 7) overloaded method value to with alternatives:
(x$1: String,x$2: org.apache.kafka.streams.kstream.Produced[?0(in value x$1),com.myobject.CustomObject])Unit <and>
(x$1: org.apache.kafka.streams.processor.StreamPartitioner[_ >: ?0(in value x$1), _ >: com.myobject.CustomObject],x$2: String)Unit
cannot be applied to (String, org.apache.kafka.streams.kstream.Produced[String,com.myobject.CustomObject])
).to("outputTopic", Produced.`with`(Serdes.String(),
I am not able to understand what is wrong.
Hopefully somebody can help me. Thanks!
Upvotes: 3
Views: 10499
Reputation: 62350
Kafka Streams API uses Java generic types extensively that make it hard for the Scala compiler to infer types correctly. Thus, you need to specify types manually for some cases to avoid ambiguous method overloads.
I good way to avoid this issues, is to not chain multiple operators, but introduce a new typed KStream
variable after each operation:
// not this
myStream.selectKey((k,v) => v.id)
.to("outputTopic", Produced.`with`(Serdes.String(),customObjectSerde))
// but this
newStream: KStream[KeyType,ValueType] = myStream.selectKey((k,v) => v.id)
newStream.to("outputTopic", Produced.`with`(Serdes.String(),customObjectSerde))
Btw: Kafka 2.0 will offer a proper Scala API for Kafka Streams (https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams) that will fix those Scala issues.
Upvotes: 5