Reputation: 365
I've just started looking around for a solution for stateful computation with Spark Streaming when I came across the updateStateByKey() function.
The problem I'm trying to solve: 10,000 sensors produce a binary value every minute.
If consecutive values a sensor reports are different from each other, I would like to flag that and send it down Kafka as a state change event.
My assumption is that updateStateByKey() can be used in this example, however I'm not entirely aware of the recommended approach of implementing the same.
Upvotes: 5
Views: 4203
Reputation: 2234
I am assuming that you will get a stream of (String, Int) pairs from the sensors where the String is the ID of the sensor and the Int is the binary value returned by the sensor. With that assumption you could try something like this:
val sensorData: DStream[(String, Int)] = ???
val state = sensorData.updateStateByKey[(String, Int)](updateFunction _)
def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = {
val newValuesMap = newValues.toMap
val currentValuesMap = currentValues.toMap
currentValuesMap.keys.foreach ( (id) =>
if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) {
//send to Kafka
}
)
Some(newValues)
}
Upvotes: 3