Reputation: 473
I use Window.sum
function to get the sum of a value in an RDD, but when I convert the DataFrame to an RDD, I found that the result's has only one partition. When does the repartitioning occur?
val rdd = sc.parallelize(List(1,3,2,4,5,6,7,8), 4)
val df = rdd.toDF("values").
withColumn("csum", sum(col("values")).over(Window.orderBy("values")))
df.show()
println(s"numPartitions ${df.rdd.getNumPartitions}")
// 1
//df is:
// +------+----+
// |values|csum|
// +------+----+
// | 1| 1|
// | 2| 3|
// | 3| 6|
// | 4| 10|
// | 5| 15|
// | 6| 21|
// | 7| 28|
// | 8| 36|
// +------+----+
I add partitionBy in Window ,but the result is error,what should i do?this is my change code:
val rdd=sc.parallelize(List(1,3,2,4,5,6,7,8),4)
val sqlContext = new SQLContext(m_sparkCtx)
import sqlContext.implicits._
val df = rdd.toDF("values").withColumn("csum", sum(col("values")).over(Window.partitionBy("values").orderBy("values")))
df.show()
println(s"numPartitions ${df.rdd.getNumPartitions}")
//1
//df is:
// +------+----+
// |values|csum|
// +------+----+
// | 1| 1|
// | 6| 6|
// | 3| 3|
// | 5| 5|
// | 4| 4|
// | 8| 8|
// | 7| 7|
// | 2| 2|
// +------+----+
Upvotes: 0
Views: 2038
Reputation: 23099
When you create a column as
withColumn("csum", sum(col("values")).over(Window.orderBy("values")))
The Window.orderBy("values")
is ordering the values of column "values" in single partition since you haven't defined partitionBy()
method to define the partition.
This is changing the number of partition
from initial 4 to 1.
The partition is 200 in your second case since the partitionBy()
method uses 200 as default partition. if you need the number of partition as 4 you can use methods like repartition(4)
or coalesce(4)
Hope you got the point!
Upvotes: 0
Reputation: 41957
Window
function has partitionBy
api for grouping the dataframe
and orderBy
to order the grouped rows in ascending or descending order.
In your first case you hadn't defined partitionBy
, thus all the values were grouped in one dataframe
for ordering purpose and thus shuffling the data into one partition.
But in your second case you had partitionBy
defined on values
itself. So since each value are distinct, each row is grouped into individual groups.
The partition in second case is 200 as that is the default partitioning defined in spark
when you haven't defined partitions and shuffle occurs
To get the same result from your second case as you get with the first case, you need to group your dataframe
as in your first case i.e. into one group. For that you will need to create another column
with constant value and use that value for partitionBy
.
Upvotes: 2