Reputation: 115
I've got the following structured query:
val A = 'load somedata from HDFS'.persist(StorageLevel.MEMORY_AND_DISK_SER)
val B = A.filter('condition 1')
val C = A.filter('condition 2')
val D = A.filter('condition 3')
val E = A.filter('condition 4')
val F = A.filter('condition 5')
val G = A.filter('condition 6')
val H = A.filter('condition 7')
val I = B.union(C).union(D).union(E).union(F).union(G).union(H)
I persist the dataframe A, so that when I use B/C/D/E/F/G/H, the A dataframe should be calculated only once? But the DAG of this job is below:
From the DAG above, it seems that stage 6-12 are all executed and the dataframe A is calculated 7 times?
Why would this happen?
Maybe the DAG is just fake? I found that there are not lines on the top of stage 7-12 where stage 6 does have two lines from other stage
I didn't list all the operations. After union
operation, I save the I
dataframe to HDFS. Will this action on the I
dataframe make the persist operation be done really? Or must I do an action operation such as count
on the A
dataframe to trigger the persist operation before reuse A
dataframe?
Upvotes: 1
Views: 2132
Reputation: 74739
Doing the following line won't persist your dataset.
val A = 'load somedata from HDFS'.persist(StorageLevel.MEMORY_AND_DISK_SER)
Caching/persistence is lazy when used with Dataset API so you have to trigger the caching using count
operator or similar that in turn submits a Spark job.
After that all the following operators, filter
including, should use InMemoryTableScan with the green dot in the plan (as shown below).
In your case even after union
the dataset I
is not cached since you have not triggered the caching (but merely marked it for caching).
After union operation, I save the
I
dataframe to HDFS. Will this action on theI
dataframe make the persist operation be done really?
Yes. Only actions (like saving to an external storage) can trigger the persistence for future reuse.
Or must I do an action operation such as count on the
A
dataframe to trigger the persist operation before reuseA
dataframe?
That's the point! In your case, since you want to reuse A
dataframe across filter
operators you should persist
it first, count
(to trigger the caching) followed by filter
s.
In your case, no filter
will benefit from any performance increase due to persist
. That persist
is practically void of any impact on the performance and just makes a code reviewer think it's otherwise.
If you want to see when and if your dataset is cached, you can check out Storage tab in web UI or ask CacheManager
about it.
val nums = spark.range(5).cache
nums.count
scala> spark.sharedState.cacheManager.lookupCachedData(nums)
res0: Option[org.apache.spark.sql.execution.CachedData] =
Some(CachedData(Range (0, 5, step=1, splits=Some(8))
,InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 5, step=1, splits=8)
))
Upvotes: 3