Reputation: 781
I have the following strategy to change a dataframe df
.
df = T1(df)
df.cache()
df = T2(df)
df.cache()
.
.
.
df = Tn(df)
df.cache()
Here T1, T2, ..., Tn
are n transformations that return spark dataframes. Repeated caching is used because df
has to pass through a lot of transformations and used mutiple times in between; without caching lazy evaluation of the transformations might make using df
in between very slow. What I am worried about is that the n dataframes that are cached one by one will gradually consume the RAM. I read that spark automatically un-caches "least recently used" items. Based on this I have the following queries -
Upvotes: 2
Views: 2480
Reputation: 305
How is "least recently used" parameter determined? I hope that a dataframe, without any reference or evaluation strategy attached to it, qualifies as unused - am I correct?
Results are cached on spark executors. A single executor runs multiple tasks and could have multiple caches in its memory at a given point in time. A single executor caches are ranked based on when it is asked. Cache just asked in some computation will have rank 1 always, and others are pushed down. Eventually when available space is full, cache with last rank is dropped to make space for new cache.
Does a spark dataframe, having no reference and evaluation strategy attached to it, get selected for garbage collection as well? Or does a spark dataframe never get garbage collected?
Dataframe is an execution expression and unless an action is called, no computation is materialised. Moreover, everything will be cleared once the executor is done with computation for that task. Only when dataframe is cached (before calling action), results are kept aside in executor memory for further use. And these result caches are cleared based on LRU.
Based on the answer to the above two queries, is the above strategy correct?
Your example seems like transformation are done in sequence and reference for previous dataframe is not used further (no idea why you are using cache). If multiple executions are done by same executor, it is possible that some results are dropped and when asked they will be re-computed again.
N.B. - Nothing is executed unless a spark action is called. Transformations are chained and optimised by spark engine when an action is called.
Upvotes: 4
Reputation: 146
As far as I have worked with spark and also with the communication with the cloudera that I had, we should unpersist/uncache the data, if we do not do that job will start to slow down, the problem becomes more severe in case of streaming job.
I have nothing to support my answer but read here and here for details
Upvotes: 1