Arun Jose
Arun Jose

Reputation: 365

Usage of updateStateByKey() in Spark Streaming to produce a stream of state changes from a stream of raw events

I've just started looking around for a solution for stateful computation with Spark Streaming when I came across the updateStateByKey() function.

The problem I'm trying to solve: 10,000 sensors produce a binary value every minute.

If consecutive values a sensor reports are different from each other, I would like to flag that and send it down Kafka as a state change event.

My assumption is that updateStateByKey() can be used in this example, however I'm not entirely aware of the recommended approach of implementing the same.

Upvotes: 5

Views: 4203

Answers (1)

Patrick McGloin
Patrick McGloin

Reputation: 2234

I am assuming that you will get a stream of (String, Int) pairs from the sensors where the String is the ID of the sensor and the Int is the binary value returned by the sensor. With that assumption you could try something like this:

val sensorData: DStream[(String, Int)] = ???

val state = sensorData.updateStateByKey[(String, Int)](updateFunction _)

def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = {
    val newValuesMap = newValues.toMap
    val currentValuesMap = currentValues.toMap

    currentValuesMap.keys.foreach ( (id) =>
            if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) {
                //send to Kafka
            }
    )       
    Some(newValues)
}

Upvotes: 3

Related Questions