suleydaman
suleydaman

Reputation: 493

Spark: Would a dataframe repartitioned to one node experience a shuffle when a groupBy is called on it?

Suppose I have some data that is all on the same partition (I performed a .coalesce(1) on the dataframe previously). I would now like to group the data and perform aggregations on it. If I used .groupBy on the dataframe, would the groups be placed onto different nodes?

I would like to avoid this if it is true as I would like to perform these calculations on the groups without shuffling too much.

Upvotes: 1

Views: 1819

Answers (2)

Alfilercio
Alfilercio

Reputation: 1118

First, coalesce(1) doesn't guarantee that all your data will be in a single node, to be soure you have to use repartition(1), this will force to unite in a single node all your data. coalesce only groups partitions in the same node, so if your data is distributed in 5 nodes (multiple partitions in each), it will keep 5 partitions at the end. repartition force a shuffle, to move all your data to a single node.

But, if your concern is the number of partitions in aggregations, it depends, if the aggregation is only a reduce of all your data, spark sql will try to reduce first in each node, and then reduce the result of each node, an example will be a count. But for bucketized aggregations, like counting the number of elements with an id, what spark do is first reduces in each node, and then shuffle the data, into buckets, to be sure that all the reductions of each node, for the same id are in the same node, and reduce them again. The number of buckets is configured with the property spark.sql.shuffle.partitions, and each one will be executed as a task in your job. Be careful because setting spark.sql.shuffle.partitions to one could make other parts of your process slower like joins or big aggregations, or result in out of memory errors.

Upvotes: 1

ollik1
ollik1

Reputation: 4540

It depends. By default, the number of partitions is defined by spark.sql.shuffle.partitions . One way to avoid this is to use repartition with explicit partitioning expression instead of coalesce:

val df = sparkSession.createDataFrame(
  sparkContext.parallelize(Seq(Row(1, "a"), Row(1, "b"), Row(2, "c"))),
  StructType(List(StructField("foo", IntegerType, true), StructField("bar", StringType, true))))
df.repartition(numPartitions = 1, $"foo").groupBy("foo").agg(count("*")).explain()

In general one can use the Spark web UI and monitor the shuffle read/write metrics on the "Stages" tab.

Upvotes: 0

Related Questions