Reputation: 4442
In a parquet data lake partitioned by year
and month
, with spark.default.parallelism
set to i.e. 4
, lets say I want to create a DataFrame comprised of months 11~12 from 2017, and months 1~3 from 2018 of two sources A
and B
.
df = spark.read.parquet(
"A.parquet/_YEAR={2017}/_MONTH={11,12}",
"A.parquet/_YEAR={2018}/_MONTH={1,2,3}",
"B.parquet/_YEAR={2017}/_MONTH={11,12}",
"B.parquet/_YEAR={2018}/_MONTH={1,2,3}",
)
If I get the number of partitions, Spark used spark.default.parallelism
as default:
df.rdd.getNumPartitions()
Out[4]: 4
Taking into account that after creating df
I need to perform join
and groupBy
operations over each period, and that data is more or less evenly distributed over each one (around 10 million rows per period):
Question
df.repartition(10,'_MONTH','_YEAR')
)?Upvotes: 2
Views: 1298
Reputation: 35229
Will a repartition improve the performance of my subsequent operations?
Typically it won't. The only reason to preemptively repartition data is to avoid further shuffling when the same Dataset
is used for multiple joins, based on the same condition
If so, if I have 10 different periods (5 per year in both A and B), should I repartition by the number of periods and explicitly reference the columns to repartition (df.repartition(10,'_MONTH','_YEAR'))?
Let's go step-by-step:
should I repartition by the number of periods
Practitioners don't guarantee 1:1 relationship between levels and partitions, so the only thing to remember is, that you cannot have more non-empty partitions than unique keys, so using significantly larger value doesn't make sense.
and explicitly reference the columns to repartition
If you repartition
and subsequently join
or groupBy
using the same set of columns for both parts is the only sensible solution.
Summary
repartitoning
before join makes sense in two scenarios:
In case of multiple subsequent joins
df_ = df.repartition(10, "foo", "bar")
df_.join(df1, ["foo", "bar"])
...
df_.join(df2, ["foo", "bar"])
With single join when desired number of the output partitions is different than spark.sql.shuffle.partitions
(and there is no broadcast join)
spark.conf.get("spark.sql.shuffle.partitions")
# 200
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df1_ = df1.repartition(11, "foo", "bar")
df2_ = df2.repartition(11, "foo", "bar")
df1_.join(df2_, ["foo", "bar"]).rdd.getNumPartitions()
# 11
df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
# 200
which might be preferable over:
spark.conf.set("spark.sql.shuffle.partitions", 11)
df1.join(df2, ["foo", "bar"]).rdd.getNumPartitions()
spark.conf.set("spark.sql.shuffle.partitions", 200)
Upvotes: 3