Reputation: 21
I am learning Spark Streaming. I wanted to keep state updated and was able to update state by using mapWithState. I also enabled checkpointing in the context. I wanted to remember the state if I have to stop/restart the job. Right now every restart starts a fresh count. I tried various cash, checkpoint options and scanned through a lot posting and not getting a clear picture.
Environment: I am running Spark locally in development and also as HDP sandbox. (I tried in both environment).
Is it possible to remember state where you kill the Spark job and restart it. (without any programming changes).
If possible how? Any pointers or suggestion would help. ( I have tried chekpoints, cache on individual RDD, MapwithStateRDD on local and also in HDP sanbox).
The only option that I did not try is saving the MapWithStateRDD to disk and read it back as initialRDD. Doesn't feel like this would be the right option anyway.
I only found a similar question with no answer. Spark Checkpoint doesn't remember state (Java HDFS)
Thanks.
Code :
def getStreamingContext(streamingApp : (SparkContext, Duration) => StreamingContext, sc : SparkContext, batchDuration: Duration) = {
val creatingFunc = () => streamingApp(sc, batchDuration)
val ssc = sc.getCheckpointDir match {
case Some(checkpointDir) =>
println("Get or Create Context")
StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc, sc.hadoopConfiguration, createOnError = true)
case None =>
print("New Context")
StreamingContext.getActiveOrCreate(creatingFunc)
}
sc.getCheckpointDir.foreach( cp => ssc.checkpoint(cp))
println(ssc.getState())
ssc
}
Spark version 2.1.0
Upvotes: 1
Views: 1682
Reputation: 21
I got it working... Thanks to the following Q/A. [link] (Spark streaming not remembering previous state)
I was missing the following line after the updateStateByKey
statefulActivity.checkpoint(Minutes(1))
Adding a duration along with enabling the checkpoint directory remembered the state on restart.
Upvotes: 1