Ehud Eshet
Ehud Eshet

Reputation: 133

How to force in-memory chunked sort in Spark SQL?

Parquet file format is sensitive to the order of records. Its columnar encoding may produce significant smaller files depending on sort order. On the other hand, sorting a terabyte of input records is highly expensive.

Splitting into chunks of let's say 10GB allows in-memory sort while generating nearly as small parquet files as if the entire 1 TB was fully sorted.

Is it possible to instruct Spark SQL to do chunked sort prior to generating parquet file?

Another use case would be merging many small Parquet files into one while using chunked sort before writing the unified Parquet file.

Upvotes: 3

Views: 1516

Answers (1)

zero323
zero323

Reputation: 330063

As far as I am aware there is no such option available out-of-the box in Spark < 2.0.0. One thing you can try is to combine coalesce with Hive SORT BY clause before writing which should have a similar effect:

val df: DataFrame = ???
val n: Int = ??? //

df.coalesce(n)
df.coalesce(n).registerTempTable("df")
sqlContext.sql("SELECT * FROM df SORT BY foo, bar").write.parquet(...)

or

df.coalesce(n).sortWithinPartitions($"foo", $"bar").write.parquet(...)

Keep in mind that SORT BY is not equivalent to DataFrame.sort.

Spark 2.0.0 introduced sortBy and bucketBy methods where the latter one sorts the output in each bucket by the given columns and should support Parquet:

val df: DataFrame = ???
val nBuckets: Int = ???

df.write.bucketBy(nBuckets, "foo").sortBy("foo", "bar").saveAsTable(...)

Note: This seems to work just when saving Parquet files with saveAsTable but it doesn't look like it supports parquet writer directly (df.write.bucketBy(...).sortBy(...).parquet(...)) in spark-2.0.0-preview.

Upvotes: 1

Related Questions