Reputation: 13
I have a sorted dataset, which is updated (filtered) inside a cycle according on the value of the head of the dataset.
If I cache the dataset every n (e.g., 50) cycles, I have good performance.
However, after a certain amount of cycles, the cache it seems to not work, since the program slows down (I guess it is because the memory assigned to the caching is filled).
I was asking if and how is it possible to maintain only the updated dataset in cache, in order to not fill the memory and still have good performance. Please find below an example of my code:
dataset = dataset.sort(/* sort condition */)
dataset.cache()
var head = dataset.head(1)
var count = 0
while (head.nonEmpty) {
count +=1
/* custom operation with the head */
dataset = dataset.filter(/* filter condition based on the head of the dataset */
if (count % 50 == 0) {
dataset.cache()
}
head = dataset.head(1)
}
Upvotes: 1
Views: 1301
Reputation: 74
Try uncaching dataset before caching this way you will remove old copy of dataset from memory and keep only the latest, avoiding multiple copies in memory. Below is sample but you have keep dataset.unpersist() in correct location based on you code logic
if (count % 50 == 0) {
dataset.cache()
}
dataset = dataset.filter(/* filter condition based on the head of the dataset */
if (count % 50 == 0) {
dataset.cache()
}
Upvotes: 0
Reputation: 35249
cache
alone won't help you here. With each iteration lineage and execution plan grow, and it is not something that can be addressed by caching alone.
You should at least break the lineage:
if (count % 50 == 0) {
dataset.cache()
dataset.checkpoint
}
although personally I would also write data to a distributed storage and read it back:
if (count % 50 == 0) {
dataset.write.parquet(s"/some/path/$count")
dataset = spark.read.parquet(s"/some/path/$count")
}
it might not be acceptable depending on your deployment, but in many cases behaves more predictably than caching and checkpointing
Upvotes: 1