Kyle Murray
Kyle Murray

Reputation: 73

In spark, is there any way to unpersist a dataframe/rdd in the middle of execution plan

Given the following series of events:

df1 = read
df2 = df1.action
df3 = df1.action
df2a = df2.action
df2b = df2.action
df3a = df3.action
df3b = df3.action
df4 = union(df2a, df2b, df3a, d3b)
df4.collect()

The data forks twice, so that df1 will be read 4 times. I therefore want to persist the data. From what I understand this is the way to do so:

df1 = read
df1.persist()
df2 = df1.action
df3 = df1.action
df2.persist()
df3.persist()
df2a = df2.action
df2b = df2.action
df3a = df3.action
df3b = df3.action
df4 = union(df2a, df2b, df3a, d3b)
df4.collect()
df1.unpersist()
df2.unpersist()
df3.unpersist()

However this keeps all three in memory at once, which isn't storage efficient considering I no longer need df1 persisted after df2 and df3 are both created. I'd like to order it more like this:

df1 = read
df1.persist()
df2 = df1.action
df3 = df1.action
df1.unpersist()
df2.persist()
df3.persist()
df2a = df2.action
df2b = df2.action
df2.unpersist()
df3a = df3.action
df3b = df3.action
df3.unpersist()
df4 = union(df2a, df2b, df3a, d3b)
df4.collect()

However this just leads to the data not being persisted at all, because I need to trigger an action before unpersisting. Is there any way to accomplish what I'm looking for (unpersisting intermediate dataframes in the middle of the execution plan)?

Upvotes: 1

Views: 1626

Answers (1)

Chitral Verma
Chitral Verma

Reputation: 2855

This is not possible but can be rearranged slightly better.

Transformations build DAGs without execution, the actual persistence happens with execution triggered by an action. If cached parent RDD(s) is unpersisted then all child RDD(s) are also unpersisted. It's a design choice to focus more on correctness of the data and its consistency. This is the reason data is not being persisted at all.

Slightly improving your steps,

df1 = read
df1.persist() 

df2 = df1.action # after this df1 will be persisted
df3 = df1.action # this will be faster as df1 is cached

df2.persist()
df3.persist()

# perform 1 action on df2 and df3 each to trigger their caching
df2a = df2.action
df3a = df3.action

df2b = df2.action # this will be faster as df2 is cached
df3b = df3.action # this will be faster as df3 is cached

df4 = union(df2a, df2b, df3a, d3b)
df4.collect()

df1.unpersist() # this along with dependents will get un persisted

Related References:

  1. https://github.com/apache/spark/pull/17097
  2. https://issues.apache.org/jira/browse/SPARK-21579

Upvotes: 2

Related Questions