Reputation: 13
I want to sort the Dataframe, so that the different partitions are sorted internally (and also across each other, i.e ALL elements of one partition are gonna be either <= or >= than ALL elements of another partition). This is important because I want to use Window functions with the Window.partitionBy("partitionID")
. However, there is something wrong with my understanding of how Spark works.
I run the following sample code:
val df = sc.parallelize(List((10),(8),(5),(9),(1),(6),(4),(7),(3),(2)),5)
.toDF("val")
.withColumn("partitionID",spark_partition_id)
df.show
+---+-----------+
|val|partitionID|
+---+-----------+
| 10| 0|
| 8| 0|
| 5| 1|
| 9| 1|
| 1| 2|
| 6| 2|
| 4| 3|
| 7| 3|
| 3| 4|
| 2| 4|
+---+-----------+
so far so good, 5 partitions are expected without internal or external order. To fix that I do:
scala> val df2 = df.orderBy("val").withColumn("partitionID2",spark_partition_id)
df2: org.apache.spark.sql.DataFrame = [val: int, partitionID: int, partitionID2: int]
scala> df2.show
+---+-----------+------------+
|val|partitionID|partitionID2|
+---+-----------+------------+
| 1| 2| 2|
| 2| 4| 4|
| 3| 4| 4|
| 4| 3| 3|
| 5| 1| 1|
| 6| 2| 2|
| 7| 3| 3|
| 8| 0| 0|
| 9| 1| 1|
| 10| 0| 0|
+---+-----------+------------+
Now the val
column is sorted, as expected but the partitions themselves are not "sorted". My expected result is something along the lines:
+---+-----------+------------+
|val|partitionID|partitionID2|
+---+-----------+------------+
| 1| 2| 2|
| 2| 4| 2|
| 3| 4| 4|
| 4| 3| 4|
| 5| 1| 1|
| 6| 2| 1|
| 7| 3| 3|
| 8| 0| 3|
| 9| 1| 0|
| 10| 0| 0|
+---+-----------+------------+
or something equivalent, i.e subsequent sorted elements belong in the same partition. Can you point out what part of my logic is flawed and how to extract the intended behavior in this example? Every help is appreciated.
I run the above using scala and Spark 1.6 if that is relevant.
Upvotes: 0
Views: 722
Reputation: 1214
val df2 = df
.orderBy("val")
.repartitionByRange(5, col("val"))
.withColumn("partitionID2", spark_partition_id)
df2.show(false)
// +---+-----------+------------+
// |val|partitionID|partitionID2|
// +---+-----------+------------+
// |1 |2 |0 |
// |2 |4 |0 |
// |3 |4 |1 |
// |4 |3 |1 |
// |5 |1 |2 |
// |6 |2 |2 |
// |7 |3 |3 |
// |8 |0 |3 |
// |9 |1 |4 |
// |10 |0 |4 |
// +---+-----------+------------+
Upvotes: 0