rabejens
rabejens

Reputation: 8162

How to preserve order of a DataFrame when writing it as CSV with partitioning by columns?

I sort the rows of a DataFrame and write it out to disk like so:

df.
  orderBy("foo").
  write.
  partitionBy("bar", "moo").
  option("compression", "gzip").
  csv(outDir)

When I look into the generated .csv.gz files, their order is not preserved. Is this the way Spark does this? Is there a way to preserve order when writing a DF to disk with a partitioning?

Edit: To be more precise: Not the order of the CSVs is off, but the order inside them. Let's say I have it like the following after df.orderBy (for simplicity, I now only partition by one column):

foo | bar | baz
===============
  1 |   1 |   1
  1 |   2 |   2
  1 |   1 |   3
  2 |   3 |   4
  2 |   1 |   5
  3 |   2 |   6
  3 |   3 |   7
  4 |   2 |   9
  4 |   1 |  10

I expect it to be like this, e.g. for files in folder bar=1:

part-00000-NNN.csv.gz:

1,1
1,3
2,5

part-00001-NNN.csv.gz:

3,8
4,10

But what it is like:

part-00000-NNN.csv.gz:

1,1
2,5
1,3

part-00001-NNN.csv.gz:

4,10
3,8

Upvotes: 3

Views: 4715

Answers (1)

rabejens
rabejens

Reputation: 8162

It's been a while but I witnessed this again. I finally came across a workaround.

Suppose, your schema is like:

  • time: bigint
  • channel: string
  • value: double

If you do:

df.sortBy("time").write.partitionBy("channel").csv("hdfs:///foo")

the timestamps in the individual part-* files get tossed around.

If you do:

df.sortBy("channel", "time").write.partitionBy("channel").csv("hdfs:///foo")

the order is correct.

I think it has to do with shuffling. So, as a workaround, I am now sorting by the columns I want my data to be partitioned by first, then by the column I want to have it sorted in the individual files.

Upvotes: 2

Related Questions