cgam
cgam

Reputation: 39

How does coalesce() work internally in spark?

I am exploring coalesce() function and I have few questions unanswered:

  1. Does coalesce() move the data within executor only or it moves the data partitions spread across on multiple machines ? If it is executor level only , in that case How coalesce() works if there is only one partition per executor per machine and there are 3 partitions on three nodes ?
  2. How does coalesce(1) work?
  3. I have seen cases When I am running coalesce() function on data-frame it is creating a output file of 500 MB and other at 1.2 GB file , why there is so huge difference ? I am aware with coalesce() partitions won't be of equal size , But What is reason of creation of big file . How does coalesce() work internally , which has created 700 MB data difference between these two files ?

Upvotes: 1

Views: 5144

Answers (1)

Naga
Naga

Reputation: 1253

Spark offers two transformations to resize the RDD/DataFrame/DataSet i.e., increasing or decreasing the partitions of an RDD/DataFrame/DataSet.

Those two transformations are:

  1. repartition
  2. coalesce

repartition: repetition used to increase or decrease the partitions of an RDD/DataFrame/DataSet. Either increase or decrease of partitions data shuffle takes place which is an expensive operation in Spark.

coalesce: coalesce also used to increase or decrease the partitions of an RDD/DataFrame/DataSet. coalesce has different behaviour for increase and decrease of an RDD/DataFrame/DataSet.

In case of partition increase, coalesce behavior is same as repartition. But in case of partition decrease, coalesce optimizes data shuffle by merging local partitions i.e., within the executor.

We have two types of coalesce:

  1. coalesce
  2. drastic coalesce

We use coalesce as follows in Spark programming:

RDD/DataFrame/DataSet.coalesce(n)

here n is number of partitions. if n < no of executors then drastic coalesce takes place which leads data shuffle otherwise shuffle doesn't take place.

Example:
val rdd = sc.parallelize(data, 5)
#here rdd has 5 partitions
# Spark application launched with three executors
val rdd1 = rdd.coalesce(3)
# shuffle doesn't takes place
val rdd2 = rdd.coalesce(1)
# coalesce(1) leads drastic shuffle i.e., fetching all partitions from three executors to one.

coalesce merges partitions of an RDD/DataFrame/DataSet executor locally. It doesn't check the size of the partitions. If you have imbalanced size of partitions, better to re-distribute them using different partitioning logic.

Upvotes: 5

Related Questions