Reputation: 495
I have a Spark streaming application that uses stateful transformations quite a bit. In retrospect, Spark might have not been the best choice, but I'm still trying to make it work.
My question is that why do my MapWithStateRDDs take up so much memory? As an example, I have a transformation where the state is around 1.5 Gb in memory, and I see that same RDD being restored for each batch. So after the 3rd batch it shows on the UI that there are 3 MapWithStateRDDs with the exact same size while the state didn't change in those batches. Do these actually take up 3x the space? That seems like a huge waste, shouldn't it only store the deltas until a checkpoint and then compact them ibto one RDD or something like that? I assumed that's how it works, and having more stateful transformations eats up a lot of memory.
Upvotes: 1
Views: 257
Reputation: 549
As pointed out in the link in comments, this is because mapWithState checkpoints the data after every 10 batches, so it keeps the RDDs cached until that point.
To get rid of this space wastage, you can choose to checkpoint the state at every batch. In my case that didn't turn out to be very expensive.
JavaInputDStream<ConsumerRecord<String, Object>> rtStream = ...
JavaMapWithStateDStream<String, Object, Object, Tuple2<String, Object>> mapWithStateSTream = rtStream .mapToPair(..).mapWithState(...);
mapWithStateSTream.checkpoint(Durations.seconds(10)); //keep this same as your batch interval, or feel free to change acc to your requirement
mapWithStateSTream.forEachRdd(...your logic here..);
And there you go, magic!! you are not seeing those irritating MapWithStateRDD in the "Storage" tab anymore.
Upvotes: 1