Reputation: 1141
I am running a spark streaming for 24/7 and using updateStateByKey Is is possible to run spark streaming 24/7? If Yes wont the updateStateByKey grow big, how to handle it? Do we have to reset/delete updateStateByKey periodically when we run 24/7 if not how and when to reset it? Or Spark handles in a distributed way? how to increate the memory/storage dynamically.
I get these following errors when the updateStateByKey Grows
Array out of bound exception
Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
How to handle this.. Please point me if there are any docs? i am completely stuck, any help is much appreciated.. Thanks for your time
Upvotes: 3
Views: 2551
Reputation: 21
pyspark : updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None)
Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key.
@param updateFunc: State update function. If this function returns None, then corresponding state key-value pair will be eliminated.
updateFunc method return None, state key-value pair remove;
Upvotes: 0
Reputation: 121
Use Optional.absent() in Java and None in Scala to remove keys. Working example can be found at http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/.
Upvotes: 5
Reputation: 789
Update the key with None will delete it from spark. If you want to keep the key for a certain amount of time, you can attach an expire time to the key, and check it every batch.
For example, here's the code to count records by minute:
val counts = lines.map(line => (currentMinute, 1))
val countsWithState = counts updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
if (values.isEmpty) { // every key will be iterated, even if there's no record in this batch
println("values is empty")
None // this will remove the key from spark
} else {
println("values size " + values.size)
Some(state.sum + values.sum)
}
}
Upvotes: 0