Reputation: 5457
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)], numSlices=8)
rdd2 = rdd1.mapValues(lambda x: x)
These RDDs have the same partitioning:
rdd1.keys().glom().collect()
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']]
rdd2.keys().glom().collect()
>>> [[], ['a'], [], ['b'], [], ['c'], [], ['d']]
There's multiple answers here on SO that suggest that joining co-partitioned data will not cause a shuffle, which makes a lot of sense to me. Example: Does a join of co-partitioned RDDs cause a shuffle in Apache Spark?
However, when I join these co-partitioned RDDs using PySpark, the data is shuffled into a new partition:
rdd1.join(rdd2).keys().glom().collect()
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], [], [], [], [], [], [], [], [], []]
And the partitioning changes even when I set the number of new partitions to the original 8:
rdd1.join(rdd2, numPartitions=8).keys().glom().collect()
>>> [['a'], [], ['c'], ['b'], [], ['d'], [], []]
How come I can't avoid a shuffle using these co-partitioned RDDs?
I'm using Spark 1.6.0.
Upvotes: 4
Views: 1170
Reputation: 330063
In this case neither rdd1
nor rdd2
is partitioned
rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
rdd2 = rdd1.mapValues(lambda x: x)
rdd1.partitioner is None
## True
rdd2.partitioner is None
# True
so by definition there are not co-partitioned. While you could partition data and join:
n = rdd1.getNumPartitions()
rdd1part = rdd1.partitionBy(n)
rdd2part = rdd2.partitionBy(n)
rdd1part.join(rdd2part) # rdd1part and rdd2part are co-partitioned
this would simply rearrange DAG and won't prevent shuffle.
See also Default Partitioning Scheme in Spark
Upvotes: 6