林鼎棋
林鼎棋

Reputation: 2075

Spark Dataframe suddenly become very slow when I reuse the old cached data iteratively too much time

The problem happened when I try to keep my cached result in a List and try to calculate new DataFrame by all the data from the last list in each iteration. However, Even I use an empty DataFrame and get an empty result each time, the function will suddenly get very slow after about 8~12 round.

Here is my code

testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){      
  // do some dummy transformation like union and cache the result
  val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache        

  // always get 0, of course  
  println(resultDf.count)  

  // benchmark action
  benchmark(resultDf.count)    

  testLoop(resultDf::lastDfList)
}

the benchmark result 1~6 round : < 200ms 7 round : 367ms 8 round : 918ms 9 round : 2476ms 10 round : 7833ms 11 round : 24231ms

I don't think GC or Block eviction is the problem in my case since I already use an empty DataFrame, but I don't know what is the cause? Do I misunderstand the meaning of cache or something?

Thanks!


After reading ImDarrenG's solution, I changed my code to be the following:

spark.sparkContext.setCheckpointDir("/tmp")

testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){      
  // do some dummy transformation like union and cache the result
  val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache        

  resultDf.checkpoint()  

  // always get 0, of course  
  println(resultDf.count)  

  // benchmark action
  benchmark(resultDf.count)    

  testLoop(resultDf::lastDfList)
}

But it still become very slow after a few iterations.

Upvotes: 2

Views: 1843

Answers (2)

hao xie
hao xie

Reputation: 11

According to API and code, checkpoint will return a new Dataset instead of changing original Dataset.

Upvotes: 1

ImDarrenG
ImDarrenG

Reputation: 2345

Here you create a list of DataFrames by adding resultDf to the beginning of lastDfList and pass that to the next iteration of testLoop:

testLoop(resultDf::lastDfList)

So lastDfList gets longer each pass.

This line creates a new DataFrame by unioning each member of lastDfList:

val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf))}.cache 

Each member of lastDfList is a union of it's predecessors, therefore, Spark is maintaining a lineage that becomes exponentially larger with each pass of testLoop.

I expect that the increase in time is caused by the housekeeping of the DAG. Caching the dataframes removes the need to repeat transformations, but the lineage must still be maintained by spark.

Cached data or no, it looks like you are building a really complex DAG by unioning each DataFrame with all of it's predecessors with each pass of testLoop.

You could use checkpoint to trim the lineage, and introduce some check to prevent infinite recursion.

Upvotes: 3

Related Questions