How to change the key of a KStream and then write to a topic using Scala?

I am using Kafka Streams 1.0, I am reading a topic in a Kstream[String, CustomObject], then I am trying to select a new key that comes from one member of the CustomObject, the code looks like this:

 myStream: KStream[String, CustomObject] = builder.stream("topic")
  .mapValues {
      ...
      //code to transform json to CustomObject
      customObject
   }
 myStream.selectKey((k,v) => v.id)
 .to("outputTopic", Produced.`with`(Serdes.String(),
      customObjectSerde))

It gives this error:

Error:(109, 7) overloaded method value to with alternatives:
  (x$1: String,x$2: org.apache.kafka.streams.kstream.Produced[?0(in value x$1),com.myobject.CustomObject])Unit <and>
  (x$1: org.apache.kafka.streams.processor.StreamPartitioner[_ >: ?0(in value x$1), _ >: com.myobject.CustomObject],x$2: String)Unit
 cannot be applied to (String, org.apache.kafka.streams.kstream.Produced[String,com.myobject.CustomObject])
    ).to("outputTopic", Produced.`with`(Serdes.String(),

I am not able to understand what is wrong.

Hopefully somebody can help me. Thanks!

Upvotes: 3

Views: 10499

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

Kafka Streams API uses Java generic types extensively that make it hard for the Scala compiler to infer types correctly. Thus, you need to specify types manually for some cases to avoid ambiguous method overloads.

Also compare: https://docs.confluent.io/current/streams/faq.html#scala-compile-error-no-type-parameter-java-defined-trait-is-invariant-in-type-t

I good way to avoid this issues, is to not chain multiple operators, but introduce a new typed KStream variable after each operation:

// not this
myStream.selectKey((k,v) => v.id)
        .to("outputTopic", Produced.`with`(Serdes.String(),customObjectSerde))

// but this
newStream: KStream[KeyType,ValueType] = myStream.selectKey((k,v) => v.id)
newStream.to("outputTopic", Produced.`with`(Serdes.String(),customObjectSerde))

Btw: Kafka 2.0 will offer a proper Scala API for Kafka Streams (https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams) that will fix those Scala issues.

Upvotes: 5

Related Questions