Reputation: 453
I am trying to use kafka streams with scala below is my code in Java which works perfectly fine:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
textLines.foreach((key,values) -> {
System.out.println(values);
});
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
My scala is code as follows:
val builder = new KStreamBuilder
val textLines:KStream[String, String] = builder.stream("TextLinesTopic")
textLines.foreach((key,value)-> {
println(key)
})
val streams = new KafkaStreams(builder, config)
streams.start()
The scala code throws compilation error. Type mismatch expected:ForEachAction[>String,>String],Actual((any,any), Unit) not found:value key not found: value value
Does anyone know how to use streams API in scala
Upvotes: 2
Views: 1892
Reputation: 3250
You can directly print kafkastream using print method.
textlines.print
It will print the kafka stream. You can even print either key or values by passing argument to print function.
Upvotes: 1
Reputation: 755
Your syntax is wrong :). ->
is just operator for creating pairs, so expression
(key,value)-> {
println(key)
}
Has type ((Any, Any), Unit) because compiler cannot infer any type information (and key
and value
are missing)
If you are using scala 2.12 replacing ->
with =>
should solve the problem, but if you are using older version of scala, you will have to implement java bifunction explicitly:
textLines.foreach(new BiFunction[T1, T2] { ... })
Upvotes: 5