jybsuper
jybsuper

Reputation: 189

How to force Spark to only execute a transformation once?

I have a spark job which samples my input data randomly. Then, I generate a bloom filter for the input data. Finally, I apply the filter and join the data with dataset A.

Since the sampling is random, it should only be executed only once.

But it executes twice even if I persist it. I can see a green cache step in Spark DAG of the first step but the join still starts from data loading and random sampling. I also found the cached data can be evited when workers are running out of memory, which I did not expect.

Here is my code:

// raw data is a RDD
val rawData = loadData("/path/to/data").filter(ramdomSampling).persist(StorageLevel.MEMORY_AND_DISK_SER)

val myFilter = getMyBloomFilter(rawData)

// apply bloom filter and join input data with dataset A
val dataWithA = appliyFilterAndJoin(loadData, myFilter)

How can I force Spark to execute some transformations only once even if workers do not have enough memory for cache?

Thanks!

Upvotes: 1

Views: 610

Answers (1)

Ashwanth Kumar
Ashwanth Kumar

Reputation: 677

Try writing the sampled DAG to an output (HDFS / S3 or a local filesystem). Then re-read that output for the next stages. That way the output from your sampling step will be intact.

Upvotes: 2

Related Questions