Reputation: 563
I am trying to understand the difference between coalesce()
and repartition()
.
If I correctly understood this answer, coalesce()
can only reduce number of partitions of dataframe and if we try to increase the number of partitions then number of partitions remains unchanged.
But when I tried to execute below code, I observed two things
Does it mean that with coalesce dataframe partitions can be increased?
When I execute the following code:
val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
println("Original dataframe partitions = " + h1b1Df.rdd.getNumPartitions)
val coalescedDf = h1b1Df.coalesce(2)
println("Coalesced dataframe partitions = " + coalescedDf.rdd.getNumPartitions
val coalescedDf1 = coalescedDf.coalesce(6)
println("Coalesced dataframe with increased partitions = " + coalescedDf1.rdd.getNumPartitions)
I get the following output
Original dataframe partitions = 8
Coalesced dataframe partitions = 2
Coalesced dataframe with increased partitions = 6
When I execute the following code:
val inpRdd = h1b1Df.rdd
println("Original rdd partitions = " + inpRdd.getNumPartitions)
val coalescedRdd = inpRdd.coalesce(4)
println("Coalesced rdd partitions = " + coalescedRdd.getNumPartitions)
val coalescedRdd1 = coalescedRdd.coalesce(6, false)
println("Coalesced rdd with increased partitions = " + coalescedRdd1.getNumPartitions)
I get the following output:
Original rdd partitions = 8
Coalesced rdd partitions = 4
Coalesced rdd with increased partitions = 4
Upvotes: 3
Views: 3141
Reputation: 5068
If you apply several coalesces on the same dataframe without performing any transformation between those coalesces, Spark will optimize your coalesces by only applying the latest one, giving you the impression that you can increase number of partitions using coalesce
If we take the following code snippet:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local[8]") // use 8 threads
.appName("test-app")
.getOrCreate()
import spark.implicits._
val input = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).toDF("MyCol")
val simpleCoalesce = input.coalesce(4)
val doubleCoalesce = simpleCoalesce.coalesce(8)
println(doubleCoalesce.rdd.getNumPartitions)
It will print 8
. To better understand why we get this result, we perform an explain(true)
on doubleCoalesce
:
doubleCoalesce.explain(true)
We get the following steps to build the final spark execution plan to resolve doubleCoalesce
:
== Parsed Logical Plan ==
Repartition 8, false
+- Repartition 4, false
+- Project [value#1 AS MyCol#4]
+- LocalRelation [value#1]
...
== Physical Plan ==
Coalesce 8
+- LocalTableScan [MyCol#4]
We can see that between the parsed logical plan, which is a simple parsing of the code snippet, and the physical plan, which is the plan that will be applied to generate final dataframe, only the latest coalesce is kept, so the coalesce(4)
transformation is actually never applied. And so, only coalesce(8)
is applied and we get a dataframe with 8 partitions.
If we want coalesce(4)
to be applied, we need to perform a transformation between the two coalesces, for instance a complex select
:
import org.apache.spark.sql.functions.col
val separatedCoalesce = simpleCoalesce
.select((col("MyCol") + 0).as("MyCol"))
.coalesce(8)
println(separatedCoalesce.rdd.getNumPartitions)
Note: simple select as .select(col("MyCol"))
will not work as spark will drop select during optimization phase
This code print 4
. If we look at the physical plan of separatedCoalesce
:
== Physical Plan ==
Coalesce 8
+- *(1) Project [(MyCol#4 + 0) AS MyCol#9]
+- Coalesce 4
+- LocalTableScan [MyCol#4]
We can see that here, coalesce(4)
is applied and so at the end we get a dataframe with only 4 partitions, although we applied coalesce(8)
Spark optimizations can be tricky and lead you to think that something that didn't happen happened. So keep in mind that spark doesn't execute exactly the written code but an optimized version of it.
Upvotes: 2
Reputation: 563
Coalesce for dataframe cannot increase partitions greater than total number of cores in the cluster.
val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
h1b1Df.rdd.getNumPartitions // prints 8
val cloasedDf = h1b1Df.coalesce(21)
cloasedDf.rdd.getNumPartitions // prints 8
val cloasedDf1 = cloasedDf.coalesce(2) // prints 2
cloasedDf1.rdd.getNumPartitions
val cloasedDf2 = cloasedDf.coalesce(7) // prints 7
cloasedDf2.rdd.getNumPartitions
Upvotes: 1
Reputation: 98
Coalesce can be used to increase partitions by setting shuffle=true which is equal to repartition. When you use coalesce with shuffle=false to increase, data movement wont happen. So one partition data cant be moved to another partition. Whereas while reduce it just merges the nearest partitions.
Thanks,
Upvotes: 1