Niketa
Niketa

Reputation: 563

Can Coalesce increase partitions of Spark DataFrame

I am trying to understand the difference between coalesce() and repartition().

If I correctly understood this answer, coalesce() can only reduce number of partitions of dataframe and if we try to increase the number of partitions then number of partitions remains unchanged.

But when I tried to execute below code, I observed two things

  1. For Dataframe with coalesce number of partitions can be increased
  2. For Rdd if shuffle = false then number of partitions cannot be increase with coalesce.

Does it mean that with coalesce dataframe partitions can be increased?

Applying coalesce to dataframe

When I execute the following code:

val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
println("Original dataframe partitions = " + h1b1Df.rdd.getNumPartitions)

val coalescedDf = h1b1Df.coalesce(2)
println("Coalesced dataframe partitions = " + coalescedDf.rdd.getNumPartitions

val coalescedDf1 = coalescedDf.coalesce(6) 
println("Coalesced dataframe with increased partitions = " + coalescedDf1.rdd.getNumPartitions) 

I get the following output

Original dataframe partitions =  8
Coalesced dataframe partitions = 2
Coalesced dataframe with increased partitions = 6

Applying coalesce to RDD

When I execute the following code:

val inpRdd = h1b1Df.rdd
println("Original rdd partitions = " + inpRdd.getNumPartitions)

val coalescedRdd = inpRdd.coalesce(4)
println("Coalesced rdd partitions = " + coalescedRdd.getNumPartitions)

val coalescedRdd1 = coalescedRdd.coalesce(6, false)
println("Coalesced rdd with increased partitions = " + coalescedRdd1.getNumPartitions)

I get the following output:

Original rdd partitions =  8
Coalesced rdd partitions = 4
Coalesced rdd with increased partitions = 4

Upvotes: 3

Views: 3141

Answers (3)

Vincent Doba
Vincent Doba

Reputation: 5068

If you apply several coalesces on the same dataframe without performing any transformation between those coalesces, Spark will optimize your coalesces by only applying the latest one, giving you the impression that you can increase number of partitions using coalesce

Explanation

If we take the following code snippet:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
    .master("local[8]") // use 8 threads
    .appName("test-app")
    .getOrCreate()

import spark.implicits._

val input = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).toDF("MyCol")
val simpleCoalesce = input.coalesce(4)
val doubleCoalesce = simpleCoalesce.coalesce(8)

println(doubleCoalesce.rdd.getNumPartitions)

It will print 8. To better understand why we get this result, we perform an explain(true) on doubleCoalesce:

doubleCoalesce.explain(true)

We get the following steps to build the final spark execution plan to resolve doubleCoalesce:

== Parsed Logical Plan ==
Repartition 8, false
+- Repartition 4, false
   +- Project [value#1 AS MyCol#4]
      +- LocalRelation [value#1]

...

== Physical Plan ==
Coalesce 8
+- LocalTableScan [MyCol#4]

We can see that between the parsed logical plan, which is a simple parsing of the code snippet, and the physical plan, which is the plan that will be applied to generate final dataframe, only the latest coalesce is kept, so the coalesce(4) transformation is actually never applied. And so, only coalesce(8) is applied and we get a dataframe with 8 partitions.

If we want coalesce(4) to be applied, we need to perform a transformation between the two coalesces, for instance a complex select:

import org.apache.spark.sql.functions.col

val separatedCoalesce = simpleCoalesce
  .select((col("MyCol") + 0).as("MyCol"))
  .coalesce(8)

println(separatedCoalesce.rdd.getNumPartitions)

Note: simple select as .select(col("MyCol")) will not work as spark will drop select during optimization phase

This code print 4. If we look at the physical plan of separatedCoalesce:

== Physical Plan ==
Coalesce 8
+- *(1) Project [(MyCol#4 + 0) AS MyCol#9]
   +- Coalesce 4
      +- LocalTableScan [MyCol#4]

We can see that here, coalesce(4) is applied and so at the end we get a dataframe with only 4 partitions, although we applied coalesce(8)

Conclusion

Spark optimizations can be tricky and lead you to think that something that didn't happen happened. So keep in mind that spark doesn't execute exactly the written code but an optimized version of it.

Upvotes: 2

Niketa
Niketa

Reputation: 563

Coalesce for dataframe cannot increase partitions greater than total number of cores in the cluster.

 val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
 h1b1Df.rdd.getNumPartitions        // prints 8

 val cloasedDf = h1b1Df.coalesce(21)  
 cloasedDf.rdd.getNumPartitions     // prints 8

 val cloasedDf1 = cloasedDf.coalesce(2) // prints 2
 cloasedDf1.rdd.getNumPartitions

 val cloasedDf2 = cloasedDf.coalesce(7) // prints 7
 cloasedDf2.rdd.getNumPartitions

Upvotes: 1

Newbie_Bigdata
Newbie_Bigdata

Reputation: 98

Coalesce can be used to increase partitions by setting shuffle=true which is equal to repartition. When you use coalesce with shuffle=false to increase, data movement wont happen. So one partition data cant be moved to another partition. Whereas while reduce it just merges the nearest partitions.

Thanks,

Upvotes: 1

Related Questions