Reputation: 347
I have two RDDs, one really large in size and other much smaller. I'd like find all unique tuples in large RDD with keys from the small RDD.
For example
large_rdd = sc.parallelize([('abcdefghij'[i%10], i) for i in range(100)] * 5)
small_rdd = sc.parallelize([('zab'[i%3], i) for i in range(10)])
expected_rdd = [
('a', [1, 4, 7, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90]),
('b', [2, 5, 8, 1, 11, 21, 31, 41, 51, 61, 71, 81, 91])]
There are two expensive operations in my solution - join and distinct. Both I assume cause a full shuffle and leave the child RDD hash partitioned. Given that, is the following the best I can do ?
keys = sc.broadcast(small_rdd.keys().distinct().collect())
filtered_unique_large_rdd = (large_rdd
.filter(lambda (k, v): k in keys.value)
.distinct()
.groupByKey())
(filtered_unique_large_rdd
.join(small_rdd.groupByKey())
.mapValues(lambda x: sum([list(i) for i in x], []))
.collect())
Basically, I filter the tuples explicitly, pick distincts and then join with the smaller_rdd. I hope that that distinct operation will place the keys hash partitioned and will not cause another shuffle during the subsequent join.
Thanks in advance for any suggestions/ideas.
PS: It is not a duplicate of Which function in spark is used to combine two RDDs by keys since join (full shuffle) is an option.
Upvotes: 2
Views: 1413
Reputation: 330183
There are two expensive operations in my solution - join and distinct.
Actually there are three expensive operations. You should add groupByKey
to the list.
I hope that that distinct operation will place the keys hash partitioned and will not cause another shuffle during the subsequent join.
distinct
won't, but subsequent groupByKey
will. Problem is it requires your data to be shuffled twice - once for distinct
and once for groupByKey
.
filtered_unique_large_rdd.toDebugString()
## (8) PythonRDD[27] at RDD at PythonRDD.scala:43 []
## | MapPartitionsRDD[26] at mapPartitions at PythonRDD.scala:374 []
## | ShuffledRDD[25] at partitionBy at NativeMethodAccessorImpl.java:-2 []
## +-(8) PairwiseRDD[24] at groupByKey at <ipython-input-11-8a3af1a8d06b>:2 []
## | PythonRDD[23] at groupByKey at <ipython-input-11-8a3af1a8d06b>:2 []
## | MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:374 []
## | ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:-2 []
## +-(8) PairwiseRDD[20] at distinct at <ipython-input-11-8a3af1a8d06b>:2 []
## | PythonRDD[19] at distinct at <ipython-input-11-8a3af1a8d06b>:2 []
## | ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:423 []
You can try to replace distinct
followed by groupByKey
with aggregateByKey
:
zeroValue = set()
def seqFunc(acc, x):
acc.add(x)
return acc
def combFunc(acc1, acc2):
acc1.update(acc2)
return acc1
grouped_by_aggregate = (large_rdd
.filter(lambda kv: k[0] in keys.value)
.aggregateByKey(zeroValue, seqFunc, combFunc))
Compared to your current solution it has to shuffle large_rdd
only once:
grouped_by_aggregate.toDebugString()
## (8) PythonRDD[54] at RDD at PythonRDD.scala:43 []
## | MapPartitionsRDD[53] at mapPartitions at PythonRDD.scala:374
## | ShuffledRDD[52] at partitionBy at NativeMethodAccessorImpl.java:-2 []
## +-(8) PairwiseRDD[51] at aggregateByKey at <ipython-input-60-67c93b2860a0 ...
## | PythonRDD[50] at aggregateByKey at <ipython-input-60-67c93b2860a0> ...
## | ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:423 []
Another possible improvement is to convert keys to set before broadcasting:
keys = sc.broadcast(set(small_rdd.keys().distinct().collect()))
Right now your code performs a linear search over the list for each step of the filter.
Upvotes: 1