mentongwu
mentongwu

Reputation: 473

When i use partitionBy in window,why i get a different result with spark/scala?

I use Window.sum function to get the sum of a value in an RDD, but when I convert the DataFrame to an RDD, I found that the result's has only one partition. When does the repartitioning occur?

 val rdd = sc.parallelize(List(1,3,2,4,5,6,7,8), 4)
    val df = rdd.toDF("values").
        withColumn("csum", sum(col("values")).over(Window.orderBy("values")))
    df.show()
    println(s"numPartitions ${df.rdd.getNumPartitions}")
    // 1
    //df is:
//    +------+----+
//    |values|csum|
//    +------+----+
//    |     1|   1|
//    |     2|   3|
//    |     3|   6|
//    |     4|  10|
//    |     5|  15|
//    |     6|  21|
//    |     7|  28|
//    |     8|  36|
//    +------+----+

I add partitionBy in Window ,but the result is error,what should i do?this is my change code:

     val rdd=sc.parallelize(List(1,3,2,4,5,6,7,8),4)
        val sqlContext = new SQLContext(m_sparkCtx)
        import sqlContext.implicits._
        val df = rdd.toDF("values").withColumn("csum", sum(col("values")).over(Window.partitionBy("values").orderBy("values")))
        df.show()
        println(s"numPartitions ${df.rdd.getNumPartitions}")
       //1
//df is:
//    +------+----+
//    |values|csum|
//    +------+----+
//    |     1|   1|
//    |     6|   6|
//    |     3|   3|
//    |     5|   5|
//    |     4|   4|
//    |     8|   8|
//    |     7|   7|
//    |     2|   2|
//    +------+----+

Upvotes: 0

Views: 2038

Answers (2)

koiralo
koiralo

Reputation: 23099

When you create a column as
withColumn("csum", sum(col("values")).over(Window.orderBy("values")))

The Window.orderBy("values") is ordering the values of column "values" in single partition since you haven't defined partitionBy() method to define the partition.

This is changing the number of partition from initial 4 to 1.

The partition is 200 in your second case since the partitionBy()method uses 200 as default partition. if you need the number of partition as 4 you can use methods like repartition(4) or coalesce(4)

Hope you got the point!

Upvotes: 0

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

Window function has partitionBy api for grouping the dataframe and orderBy to order the grouped rows in ascending or descending order.

In your first case you hadn't defined partitionBy, thus all the values were grouped in one dataframe for ordering purpose and thus shuffling the data into one partition.

But in your second case you had partitionBy defined on values itself. So since each value are distinct, each row is grouped into individual groups.

The partition in second case is 200 as that is the default partitioning defined in spark when you haven't defined partitions and shuffle occurs

To get the same result from your second case as you get with the first case, you need to group your dataframe as in your first case i.e. into one group. For that you will need to create another column with constant value and use that value for partitionBy.

Upvotes: 2

Related Questions