user8215502
user8215502

Reputation: 201

Flink: Stateful stream processing by key

I have stream of data such as JSON records with an ID.

I would like to process the data such that all records with the same key are processed by the same stateful task.

How can I do that?

Upvotes: 4

Views: 464

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18997

This can be done with a stateful operator on a KeyedStream. A KeyedStream partitions all records on a key and ensures that all records with the same key go to the same operator instance and interact with the same state.

In code this looks like:

val stream: DataStream[(String, Long)] = ???
val sumByKey: DataStream[(String, Long)] = stream
  .keyBy(_._1) // key on the first attribute
  .map(new SumMapper())

class SumMapper extends RichMapFunction[(String, Long), (String, Long)] {

  var sumState: ValueState[Long] = _

  override def open(config: Configuration) {
    // configure state
    val sumDesc: ValueStateDescriptor[Long] =
      new ValueStateDescriptor[Long]("sum", classOf[Long])
    sumState = getRuntimeContext.getState(sumDesc)
  }

  override def map(in: (String, Long)): (String, Long) = {
    val sum = sumState.value() // get current sum from state
    val newSum = sum + in._2   // compute new sum
    sumState.update(newSum)    // update state
    (in._1, newSum)            // emit result
  }
}

Upvotes: 4

Related Questions