Reputation: 2628
I want to sum one columns data based on the key specified. Stream is like id(String) Key, value(Long).
val aggtimelogs: KTable[String, java.lang.Long] = stream
.groupByKey()
.aggregate(
() => 0L,
(key: String, value: java.lang.Long, aggregate: java.lang.Long) => value + aggregate)
//Failing here
Getting
Unspecified value parameters: : Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]
How to do in Scala?
Kafka version is
compile "org.apache.kafka:kafka-clients:2.0.0"
compile (group: "org.apache.kafka", name: "kafka-streams", version: "2.0.0"){
exclude group:"com.fasterxml.jackson.core"
}
Even I tried this
val reducer = new Reducer[java.lang.Long]() {
def apply(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
}
val agg = stream
.groupByKey()
.reduce(reducer)
Also this
val reducer : Reducer[Long] = (value1: Long, value2: Long) => value1 + value2
Says
StreamAggregation.scala:39: type mismatch;
found : (Long, Long) => Long
required: org.apache.kafka.streams.kstream.Reducer[Long]
val reducer : Reducer[Long] = (value1: Long, value2: Long) => value1 + value2
Upvotes: 0
Views: 616
Reputation: 2628
I did it like this
val aggVal = streams.groupByKey().reduce(new Reducer[Double]() {
def apply(val1: Double, val2: Double): Double = val1 + val2
})
Upvotes: 2