mithra
mithra

Reputation: 1141

Spark streaming 24X7 with updateStateByKey Issue

I am running a spark streaming for 24/7 and using updateStateByKey Is is possible to run spark streaming 24/7? If Yes wont the updateStateByKey grow big, how to handle it? Do we have to reset/delete updateStateByKey periodically when we run 24/7 if not how and when to reset it? Or Spark handles in a distributed way? how to increate the memory/storage dynamically.

I get these following errors when the updateStateByKey Grows

Array out of bound exception

Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4

How to handle this.. Please point me if there are any docs? i am completely stuck, any help is much appreciated.. Thanks for your time

Upvotes: 3

Views: 2551

Answers (3)

Tom Hu
Tom Hu

Reputation: 21

pyspark : updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None)

Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key.

@param updateFunc: State update function. If this function returns None, then corresponding state key-value pair will be eliminated.

updateFunc method return None, state key-value pair remove;

Upvotes: 0

Rupinder Singh
Rupinder Singh

Reputation: 121

Use Optional.absent() in Java and None in Scala to remove keys. Working example can be found at http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/.

Upvotes: 5

Jerry
Jerry

Reputation: 789

Update the key with None will delete it from spark. If you want to keep the key for a certain amount of time, you can attach an expire time to the key, and check it every batch.

For example, here's the code to count records by minute:

val counts = lines.map(line => (currentMinute, 1))
val countsWithState = counts updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
  if (values.isEmpty) { // every key will be iterated, even if there's no record in this batch
    println("values is empty")
    None // this will remove the key from spark
  } else {
    println("values size " + values.size)
    Some(state.sum + values.sum)
  }
}

Upvotes: 0

Related Questions