Xinwei Liu
Xinwei Liu

Reputation: 353

DataFrame orderBy followed by limit in Spark

I am having a program take generate a DataFrame on which it will run something like

    Select Col1, Col2...
    orderBy(ColX) limit(N)

However, when i collect the data in end, i find that it is causing the driver to OOM if I take a enough large top N

Also another observation is that if I just do sort and top, this problem will not happen. So this happen only when there is sort and top at the same time.

I am wondering why it could be happening? And particular, what is really going underneath this two combination of transforms? How does spark will evaluate query with both sorting and limit and what is corresponding execution plan underneath?

Also just curious does spark handle sort and top different between DataFrame and RDD?

EDIT, Sorry i didn't mean collect, what i original just mean that when i call any action to materialize the data, regardless of whether it is collect (or any action sending data back to driver) or not (So the problem is definitely not on the output size)

Upvotes: 4

Views: 6507

Answers (1)

zero323
zero323

Reputation: 330093

While it is not clear why this fails in this particular case there multiple issues you may encounter:

  • When you use limit it simply puts all data on a single partition, no matter how big n is. So while it doesn't explicitly collect it almost as bad.
  • On top of that orderBy requires a full shuffle with range partitioning which can result in a different issues when data distribution is skewed.
  • Finally when you collect results can be larger than the amount of memory available on the driver.

If you collect anyway there is not much you can improve here. At the end of the day driver memory will be a limiting factor but there still some possible improvements:

  • First of all don't use limit.
  • Replace collect with toLocalIterator.
  • use either orderBy |> rdd |> zipWithIndex |> filter or if exact number of values is not a hard requirement filter data directly based on approximated distribution as shown in Saving a spark dataframe in multiple parts without repartitioning (in Spark 2.0.0+ there is handy approxQuantile method).

Upvotes: 6

Related Questions