Reputation: 353
I am having a program take generate a DataFrame on which it will run something like
Select Col1, Col2...
orderBy(ColX) limit(N)
However, when i collect the data in end, i find that it is causing the driver to OOM if I take a enough large top N
Also another observation is that if I just do sort and top, this problem will not happen. So this happen only when there is sort and top at the same time.
I am wondering why it could be happening? And particular, what is really going underneath this two combination of transforms? How does spark will evaluate query with both sorting and limit and what is corresponding execution plan underneath?
Also just curious does spark handle sort and top different between DataFrame and RDD?
EDIT, Sorry i didn't mean collect, what i original just mean that when i call any action to materialize the data, regardless of whether it is collect (or any action sending data back to driver) or not (So the problem is definitely not on the output size)
Upvotes: 4
Views: 6507
Reputation: 330093
While it is not clear why this fails in this particular case there multiple issues you may encounter:
limit
it simply puts all data on a single partition, no matter how big n
is. So while it doesn't explicitly collect it almost as bad.orderBy
requires a full shuffle with range partitioning which can result in a different issues when data distribution is skewed.collect
results can be larger than the amount of memory available on the driver.If you collect
anyway there is not much you can improve here. At the end of the day driver memory will be a limiting factor but there still some possible improvements:
limit
.collect
with toLocalIterator
.orderBy
|> rdd
|> zipWithIndex
|> filter
or if exact number of values is not a hard requirement filter
data directly based on approximated distribution as shown in Saving a spark dataframe in multiple parts without repartitioning (in Spark 2.0.0+ there is handy approxQuantile
method).Upvotes: 6