ak17
ak17

Reputation: 202

orderby is not giving correct results in spark SQL

I have a dataset of around 60 columns and 3000 rows. I am using orderby for sorting rows in dataset and writing in a file But its not giving correct results as excpeted.

dataset.orderBy(new Column(col_name).desc())
                .coalesce(4)
                .write()
                .format("com.databricks.spark.csv")
                .option("delimiter", ",")
                .option("header", "false")
                .mode(SaveMode.Overwrite)
                .save("hdfs://" + filePath);

Please let me know what I am missing here

Also I found below solution but don't think that is the correct solution

        Row[] rows = dataset.take(3000);

        for ( Row row : rows){
            // here i am writing in a file row by row
            System.out.println(row);
        }

Upvotes: 0

Views: 1387

Answers (3)

maxime G
maxime G

Reputation: 1771

because your .coalesce(4) suffle your dataframe order

coalesce first then sort .

dataset
.coalesce(4)
.orderBy(new Column(col_name).desc())                
.write()
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "false")
.mode(SaveMode.Overwrite)
.save("hdfs://" + filePath);

you also should set spark.sql.suffle.partitions to 4 in your spark context because order by also provoque suffle.

Upvotes: 2

Sanket9394
Sanket9394

Reputation: 2091

As per your clarification in the comments, you need your ordered output to be contained in a single file.

With only spark, that's possible only with spark.sql("set spark.sql.shuffle.partitions=1") followed by orderBy and write. But the drawback is it won't scale for big data as it will not be parallelized.

A work around is :

  • Make your spark do the orderBy with maximum parallelized work, (i.e. don't coalesce or "set spark.sql.shuffle.partitions=1") and have n number of files.
  • Add some extra logical handling in your file merging code
  • List All files, fetch the value of col_name and maintain a map of [(col_name value), filepath]
  • Sort the map by key (value of col_name)
  • Then perform your merge

This will maintain your ordering.

The idea is, the merging part will be mostly single threaded, at least do the sorting in a distributed way :)

Upvotes: 0

Raphael Roth
Raphael Roth

Reputation: 27383

the problem is that coalesce will merge your existing partitions in an unsorted way (and no, coalesce will not cause a shuffle).

If you want 4 files and sorting within the files, you need to change spark.sql.suffle.partitions before the orderBy, this will cause your shuffle to have 4 partitions.

spark.sql("set spark.sql.shuffle.partitions=4")

dataset.orderBy(new Column(col_name).desc())
            .write()
            .format("com.databricks.spark.csv")
            .option("delimiter", ",")
            .option("header", "false")
            .mode(SaveMode.Overwrite)
            .save("hdfs://" + filePath);

if you only care about the sorting within the files, you could also use sortWithinPartitions(new Column(col_name).desc())

Upvotes: 3

Related Questions