Reputation: 1868
To ensure colocation and copartitioning of two Datasets they must be partitioned by the same key(s) and number of partitions within the same job.
If I join these Datasets does the resulting joined Dataset retain this partitioning?
If I then partition a third Dataset by the same key(s) and number of partitions within the same job does this guarantee copartitioning/colocation with the joined Dataset?
Upvotes: 3
Views: 5223
Reputation: 116
The question and Mikhail Dubkov's answer are very helpful. I'd like to point out that the spark.sql.shuffle.partitions determines the final partition size. So, if your data partitions are 100 and your spark.sql.shuffle.partitions is 5, then the final DataFrame will have 5 partitions.
Here's the code in PySpark:
def example_shuffle_partitions(data_partitions=10, shuffle_partitions=4):
"""
spark.sql.shuffle.partitions is very important. After a join or other operation requiring a shuffle,
it redefines your partition size. So, you may start with 500 data partitions, but eventually you're likely to
revert to spark.sql.shuffle.partitions.
This example illustrates how spark.sql.shuffle.partitions determines the number of partitions.
:return:
"""
get_spark().conf.set("spark.sql.shuffle.partitions", shuffle_partitions)
get_spark().sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
df1 = get_spark().range(1, 1000).repartition(data_partitions)
df2 = get_spark().range(1, 2000).repartition(data_partitions)
df3 = get_spark().range(1, 3000).repartition(data_partitions)
print("Data partitions is: {}. Shuffle partitions is {}".format(data_partitions, shuffle_partitions))
print("Data partitions before join: {}".format(df1.rdd.getNumPartitions()))
df = (df1.join(df2, df1.id == df2.id)
.join(df3, df1.id == df3.id))
print("Data partitions after join (equal shuffle.partitions): {}".format(df.rdd.getNumPartitions()))
df.explain(True)
Here's the output of the print statements:
Data partitions is: 10. Shuffle partitions is 4
Data partitions before join: 10
Data partitions after join (equal shuffle.partitions): 4
Upvotes: 0
Reputation: 1233
My understanding is YES, Spark has several optimizations avoiding unnecessary shuffles. Let's consider examples:
lazy val spark: SparkSession =
SparkSession
.builder()
.appName(getClass.getSimpleName)
.master("local[2]")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
spark.sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
import spark.implicits._
val df1 = spark.range(1, 100)
val df2 = spark.range(1, 200)
val df3 = spark.range(1, 300)
df1
.join(df2, df1("id") === df2("id"))
.join(df3, df1("id") === df3("id"))
.explain(true)
And its physical plan:
== Physical Plan ==
*SortMergeJoin [id#5L], [id#11L], Inner
:- *SortMergeJoin [id#5L], [id#8L], Inner
: :- *Sort [id#5L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#5L, 5)
: : +- *Range (1, 100, step=1, splits=2)
: +- *Sort [id#8L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, 5)
: +- *Range (1, 200, step=1, splits=2)
+- *Sort [id#11L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#11L, 5)
+- *Range (1, 300, step=1, splits=2)
As you may see, each data was repartitioned only once and result of df1
and df2
join was not repartitioned second.
That is default behavior with num of partitions borrowed from .config("spark.sql.shuffle.partitions", "5")
However, if you try to repartition df3
within repartition
it might be interesting:
spark.sql.shuffle.partitions
that was used for df1.join(df2...
val df3 = spark.range(1, 300).repartition(3, col("id"))
df1
.join(df2, df1("id") === df2("id"))
.join(df3, df1("id") === df3("id"))
.explain(true)
And its physical plan:
== Physical Plan ==
*SortMergeJoin [id#5L], [id#11L], Inner
:- *SortMergeJoin [id#5L], [id#8L], Inner
: :- *Sort [id#5L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#5L, 5)
: : +- *Range (1, 100, step=1, splits=2)
: +- *Sort [id#8L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, 5)
: +- *Range (1, 200, step=1, splits=2)
+- *Sort [id#11L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#11L, 5)
+- *Range (1, 300, step=1, splits=2)
As you may see - the same picture, Spark gracefully ignore repartition(3, col("id"))
.
df3
with higher number of partitions:val df3 = spark.range(1, 300).repartition(10, col("id"))
df1
.join(df2, df1("id") === df2("id"))
.join(df3, df1("id") === df3("id"))
.explain(true)
And its physical plan:
== Physical Plan ==
*SortMergeJoin [id#5L], [id#11L], Inner
:- *Sort [id#5L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#5L, 10)
: +- *SortMergeJoin [id#5L], [id#8L], Inner
: :- *Sort [id#5L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#5L, 5)
: : +- *Range (1, 100, step=1, splits=2)
: +- *Sort [id#8L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, 5)
: +- *Range (1, 200, step=1, splits=2)
+- *Sort [id#11L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#11L, 10)
+- *Range (1, 300, step=1, splits=2)
As you may find, there is one extra repartition happened on result of joining df1
and df2
.
NOTE: manually repartition requires used the same partitionExprs
when calling repartition
function, otherwise there will be unnecessary shuffle.
To summarize, Spark has good optimization whenever it can do it, be careful when you involve custom code and verify everything at least using explain
.
Hope it helps!
Upvotes: 7