Reputation: 493
Suppose I have some data that is all on the same partition (I performed a .coalesce(1)
on the dataframe previously). I would now like to group the data and perform aggregations on it. If I used .groupBy
on the dataframe, would the groups be placed onto different nodes?
I would like to avoid this if it is true as I would like to perform these calculations on the groups without shuffling too much.
Upvotes: 1
Views: 1819
Reputation: 1118
First, coalesce(1)
doesn't guarantee that all your data will be in a single node, to be soure you have to use repartition(1)
, this will force to unite in a single node all your data. coalesce
only groups partitions in the same node, so if your data is distributed in 5 nodes (multiple partitions in each), it will keep 5 partitions at the end. repartition
force a shuffle, to move all your data to a single node.
But, if your concern is the number of partitions in aggregations, it depends, if the aggregation is only a reduce
of all your data, spark sql will try to reduce first in each node, and then reduce the result of each node, an example will be a count. But for bucketized aggregations, like counting the number of elements with an id, what spark do is first reduces in each node, and then shuffle the data, into buckets, to be sure that all the reductions of each node, for the same id are in the same node, and reduce them again. The number of buckets is configured with the property spark.sql.shuffle.partitions
, and each one will be executed as a task in your job. Be careful because setting spark.sql.shuffle.partitions
to one could make other parts of your process slower like joins or big aggregations, or result in out of memory errors.
Upvotes: 1
Reputation: 4540
It depends. By default, the number of partitions is defined by spark.sql.shuffle.partitions
. One way to avoid this is to use repartition
with explicit partitioning expression instead of coalesce
:
val df = sparkSession.createDataFrame(
sparkContext.parallelize(Seq(Row(1, "a"), Row(1, "b"), Row(2, "c"))),
StructType(List(StructField("foo", IntegerType, true), StructField("bar", StringType, true))))
df.repartition(numPartitions = 1, $"foo").groupBy("foo").agg(count("*")).explain()
In general one can use the Spark web UI and monitor the shuffle read/write metrics on the "Stages" tab.
Upvotes: 0