zickr sivolin
zickr sivolin

Reputation: 115

Will a persisted-dataframe be calculated repeatedly many times?

I've got the following structured query:

val A = 'load somedata from  HDFS'.persist(StorageLevel.MEMORY_AND_DISK_SER)
val B = A.filter('condition 1')
val C = A.filter('condition 2')
val D = A.filter('condition 3')
val E = A.filter('condition 4')
val F = A.filter('condition 5')
val G = A.filter('condition 6')
val H = A.filter('condition 7')

val I = B.union(C).union(D).union(E).union(F).union(G).union(H)

I persist the dataframe A, so that when I use B/C/D/E/F/G/H, the A dataframe should be calculated only once? But the DAG of this job is below:

enter image description here

From the DAG above, it seems that stage 6-12 are all executed and the dataframe A is calculated 7 times?

Why would this happen?

Maybe the DAG is just fake? I found that there are not lines on the top of stage 7-12 where stage 6 does have two lines from other stage

I didn't list all the operations. After union operation, I save the I dataframe to HDFS. Will this action on the I dataframe make the persist operation be done really? Or must I do an action operation such as count on the A dataframe to trigger the persist operation before reuse A dataframe?

Upvotes: 1

Views: 2132

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74739

Doing the following line won't persist your dataset.

val A = 'load somedata from  HDFS'.persist(StorageLevel.MEMORY_AND_DISK_SER)

Caching/persistence is lazy when used with Dataset API so you have to trigger the caching using count operator or similar that in turn submits a Spark job.

After that all the following operators, filter including, should use InMemoryTableScan with the green dot in the plan (as shown below).

enter image description here

In your case even after union the dataset I is not cached since you have not triggered the caching (but merely marked it for caching).

After union operation, I save the I dataframe to HDFS. Will this action on the I dataframe make the persist operation be done really?

Yes. Only actions (like saving to an external storage) can trigger the persistence for future reuse.

Or must I do an action operation such as count on the A dataframe to trigger the persist operation before reuse A dataframe?

That's the point! In your case, since you want to reuse A dataframe across filter operators you should persist it first, count (to trigger the caching) followed by filters.

In your case, no filter will benefit from any performance increase due to persist. That persist is practically void of any impact on the performance and just makes a code reviewer think it's otherwise.

If you want to see when and if your dataset is cached, you can check out Storage tab in web UI or ask CacheManager about it.

val nums = spark.range(5).cache
nums.count
scala> spark.sharedState.cacheManager.lookupCachedData(nums)
res0: Option[org.apache.spark.sql.execution.CachedData] =
Some(CachedData(Range (0, 5, step=1, splits=Some(8))
,InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *Range (0, 5, step=1, splits=8)
))

Upvotes: 3

Related Questions