user1733735
user1733735

Reputation: 453

Kafka streams with Scala

I am trying to use kafka streams with scala below is my code in Java which works perfectly fine:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

My scala is code as follows:

  val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()

The scala code throws compilation error. Type mismatch expected:ForEachAction[>String,>String],Actual((any,any), Unit) not found:value key not found: value value

Does anyone know how to use streams API in scala

Upvotes: 2

Views: 1892

Answers (2)

Mahesh Chand
Mahesh Chand

Reputation: 3250

You can directly print kafkastream using print method.

textlines.print

It will print the kafka stream. You can even print either key or values by passing argument to print function.

Upvotes: 1

L.Lampart
L.Lampart

Reputation: 755

Your syntax is wrong :). -> is just operator for creating pairs, so expression

(key,value)-> {
  println(key)
}

Has type ((Any, Any), Unit) because compiler cannot infer any type information (and key and value are missing)

If you are using scala 2.12 replacing -> with => should solve the problem, but if you are using older version of scala, you will have to implement java bifunction explicitly:

 textLines.foreach(new BiFunction[T1, T2] { ... })

Upvotes: 5

Related Questions