Reputation: 2075
The problem happened when I try to keep my cached result in a List and try to calculate new DataFrame by all the data from the last list in each iteration. However, Even I use an empty DataFrame and get an empty result each time, the function will suddenly get very slow after about 8~12 round.
Here is my code
testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){
// do some dummy transformation like union and cache the result
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache
// always get 0, of course
println(resultDf.count)
// benchmark action
benchmark(resultDf.count)
testLoop(resultDf::lastDfList)
}
the benchmark result
1~6 round : < 200ms
7 round : 367ms
8 round : 918ms
9 round : 2476ms
10 round : 7833ms
11 round : 24231ms
I don't think GC or Block eviction is the problem in my case since I already use an empty DataFrame, but I don't know what is the cause? Do I misunderstand the meaning of cache or something?
Thanks!
After reading ImDarrenG's solution, I changed my code to be the following:
spark.sparkContext.setCheckpointDir("/tmp")
testLoop(Nil)
def testLoop(lastDfList:List[DataFrame]){
// do some dummy transformation like union and cache the result
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf)}.cache
resultDf.checkpoint()
// always get 0, of course
println(resultDf.count)
// benchmark action
benchmark(resultDf.count)
testLoop(resultDf::lastDfList)
}
But it still become very slow after a few iterations.
Upvotes: 2
Views: 1843
Reputation: 11
According to API and code, checkpoint
will return a new Dataset instead of changing original Dataset.
Upvotes: 1
Reputation: 2345
Here you create a list of DataFrames
by adding resultDf
to the beginning of lastDfList
and pass that to the next iteration of testLoop
:
testLoop(resultDf::lastDfList)
So lastDfList
gets longer each pass.
This line creates a new DataFrame
by union
ing each member of lastDfList
:
val resultDf = lastDfList.foldLeft(Seq[Data]().toDF){(df, lastDf) => df.union(lastDf))}.cache
Each member of lastDfList
is a union of it's predecessors, therefore, Spark is maintaining a lineage that becomes exponentially larger with each pass of testLoop
.
I expect that the increase in time is caused by the housekeeping of the DAG. Caching the dataframes removes the need to repeat transformations, but the lineage must still be maintained by spark.
Cached data or no, it looks like you are building a really complex DAG by unioning each DataFrame
with all of it's predecessors with each pass of testLoop
.
You could use checkpoint
to trim the lineage, and introduce some check to prevent infinite recursion.
Upvotes: 3