Reputation: 201
I have stream of data such as JSON records with an ID.
I would like to process the data such that all records with the same key are processed by the same stateful task.
How can I do that?
Upvotes: 4
Views: 464
Reputation: 18997
This can be done with a stateful operator on a KeyedStream
. A KeyedStream
partitions all records on a key and ensures that all records with the same key go to the same operator instance and interact with the same state.
In code this looks like:
val stream: DataStream[(String, Long)] = ???
val sumByKey: DataStream[(String, Long)] = stream
.keyBy(_._1) // key on the first attribute
.map(new SumMapper())
class SumMapper extends RichMapFunction[(String, Long), (String, Long)] {
var sumState: ValueState[Long] = _
override def open(config: Configuration) {
// configure state
val sumDesc: ValueStateDescriptor[Long] =
new ValueStateDescriptor[Long]("sum", classOf[Long])
sumState = getRuntimeContext.getState(sumDesc)
}
override def map(in: (String, Long)): (String, Long) = {
val sum = sumState.value() // get current sum from state
val newSum = sum + in._2 // compute new sum
sumState.update(newSum) // update state
(in._1, newSum) // emit result
}
}
Upvotes: 4