Reputation: 202
I have a dataset of around 60 columns and 3000 rows. I am using orderby for sorting rows in dataset and writing in a file But its not giving correct results as excpeted.
dataset.orderBy(new Column(col_name).desc())
.coalesce(4)
.write()
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "false")
.mode(SaveMode.Overwrite)
.save("hdfs://" + filePath);
Please let me know what I am missing here
Also I found below solution but don't think that is the correct solution
Row[] rows = dataset.take(3000);
for ( Row row : rows){
// here i am writing in a file row by row
System.out.println(row);
}
Upvotes: 0
Views: 1387
Reputation: 1771
because your .coalesce(4)
suffle your dataframe order
coalesce first then sort .
dataset
.coalesce(4)
.orderBy(new Column(col_name).desc())
.write()
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "false")
.mode(SaveMode.Overwrite)
.save("hdfs://" + filePath);
you also should set spark.sql.suffle.partitions
to 4 in your spark context because order by also provoque suffle.
Upvotes: 2
Reputation: 2091
As per your clarification in the comments, you need your ordered
output to be contained in a single file.
With only spark, that's possible only with spark.sql("set spark.sql.shuffle.partitions=1")
followed by orderBy
and write. But the drawback is it won't scale
for big data as it will not be parallelized.
A work around is :
orderBy
with maximum parallelized work, (i.e. don't coalesce
or "set spark.sql.shuffle.partitions=1"
) and have n
number of files.col_name
and maintain a map of [(col_name value), filepath]
col_name
)This will maintain your ordering.
The idea is, the merging part will be mostly single threaded, at least do the sorting in a distributed way :)
Upvotes: 0
Reputation: 27383
the problem is that coalesce
will merge your existing partitions in an unsorted way (and no, coalesce
will not cause a shuffle).
If you want 4 files and sorting within the files, you need to change spark.sql.suffle.partitions
before the orderBy
, this will cause your shuffle to have 4 partitions.
spark.sql("set spark.sql.shuffle.partitions=4")
dataset.orderBy(new Column(col_name).desc())
.write()
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "false")
.mode(SaveMode.Overwrite)
.save("hdfs://" + filePath);
if you only care about the sorting within the files, you could also use sortWithinPartitions(new Column(col_name).desc())
Upvotes: 3