Reputation: 189
I have a spark job which samples my input data randomly. Then, I generate a bloom filter for the input data. Finally, I apply the filter and join the data with dataset A.
Since the sampling is random, it should only be executed only once.
But it executes twice even if I persist it. I can see a green cache step in Spark DAG of the first step but the join still starts from data loading and random sampling. I also found the cached data can be evited when workers are running out of memory, which I did not expect.
Here is my code:
// raw data is a RDD
val rawData = loadData("/path/to/data").filter(ramdomSampling).persist(StorageLevel.MEMORY_AND_DISK_SER)
val myFilter = getMyBloomFilter(rawData)
// apply bloom filter and join input data with dataset A
val dataWithA = appliyFilterAndJoin(loadData, myFilter)
How can I force Spark to execute some transformations only once even if workers do not have enough memory for cache?
Thanks!
Upvotes: 1
Views: 610
Reputation: 677
Try writing the sampled DAG to an output (HDFS / S3 or a local filesystem). Then re-read that output for the next stages. That way the output from your sampling step will be intact.
Upvotes: 2