Thomas
Thomas

Reputation: 13

Spark SQL orderBy and global ordering across partitions

I want to sort the Dataframe, so that the different partitions are sorted internally (and also across each other, i.e ALL elements of one partition are gonna be either <= or >= than ALL elements of another partition). This is important because I want to use Window functions with the Window.partitionBy("partitionID"). However, there is something wrong with my understanding of how Spark works.

I run the following sample code:

val df = sc.parallelize(List((10),(8),(5),(9),(1),(6),(4),(7),(3),(2)),5)
         .toDF("val")
         .withColumn("partitionID",spark_partition_id)
df.show
+---+-----------+
|val|partitionID|
+---+-----------+
| 10|          0|
|  8|          0|
|  5|          1|
|  9|          1|
|  1|          2|
|  6|          2|
|  4|          3|
|  7|          3|
|  3|          4|
|  2|          4|
+---+-----------+

so far so good, 5 partitions are expected without internal or external order. To fix that I do:

scala> val df2 = df.orderBy("val").withColumn("partitionID2",spark_partition_id)
df2: org.apache.spark.sql.DataFrame = [val: int, partitionID: int, partitionID2: int]

scala> df2.show
+---+-----------+------------+
|val|partitionID|partitionID2|
+---+-----------+------------+
|  1|          2|           2|
|  2|          4|           4|
|  3|          4|           4|
|  4|          3|           3|
|  5|          1|           1|
|  6|          2|           2|
|  7|          3|           3|
|  8|          0|           0|
|  9|          1|           1|
| 10|          0|           0|
+---+-----------+------------+

Now the val column is sorted, as expected but the partitions themselves are not "sorted". My expected result is something along the lines:

+---+-----------+------------+
|val|partitionID|partitionID2|
+---+-----------+------------+
|  1|          2|           2|
|  2|          4|           2|
|  3|          4|           4|
|  4|          3|           4|
|  5|          1|           1|
|  6|          2|           1|
|  7|          3|           3|
|  8|          0|           3|
|  9|          1|           0|
| 10|          0|           0|
+---+-----------+------------+

or something equivalent, i.e subsequent sorted elements belong in the same partition. Can you point out what part of my logic is flawed and how to extract the intended behavior in this example? Every help is appreciated.

I run the above using scala and Spark 1.6 if that is relevant.

Upvotes: 0

Views: 722

Answers (1)

mvasyliv
mvasyliv

Reputation: 1214

val df2 = df
  .orderBy("val")
  .repartitionByRange(5, col("val"))
  .withColumn("partitionID2", spark_partition_id)

df2.show(false)
//    +---+-----------+------------+
//    |val|partitionID|partitionID2|
//    +---+-----------+------------+
//    |1  |2          |0           |
//    |2  |4          |0           |
//    |3  |4          |1           |
//    |4  |3          |1           |
//    |5  |1          |2           |
//    |6  |2          |2           |
//    |7  |3          |3           |
//    |8  |0          |3           |
//    |9  |1          |4           |
//    |10 |0          |4           |
//    +---+-----------+------------+

Upvotes: 0

Related Questions