mlee_jordan
mlee_jordan

Reputation: 842

Is it possible to join two rdds' values to avoid expensive shuffling?

I have two RDDs both having two columns as (K,V). In the sources for those RDDs keys are appearing one under the other and for each row a different and distinct value is assigned to the keys. The text files to create RDDs are given at the bottom of this post.

Keys are totally different in both RDDs and I would like to join two RDDs based on their values and try to find how many common values exist for each pair. e.g. I am trying to reach a result such as (1-5, 10) meaning that a key value of "1" from RDD1 and a key value of "5" from RDD2 share 10 values in common.

I work on a single machine with 256 GB ram and 72 cores. One text file is 500 MB while the other is 3 MB.

Here is my code:

val conf = new SparkConf().setAppName("app").setMaster("local[*]").set("spark.shuffle.spill", "true")
.set("spark.shuffle.memoryFraction", "0.4")
.set("spark.executor.memory","128g")
.set("spark.driver.maxResultSize", "0")

val RDD1 = sc.textFile("\\t1.txt",1000).map{line => val s = line.split("\t"); (s(0),s(1))}

val RDD2 = sc.textFile("\\t2.txt",1000).map{line => val s = line.split("\t"); (s(1),s(0))}


val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap)

        val joined = emp.mapPartitions(iter => for {
          (k, v1) <- iter
          v2 <- emp_newBC.value.getOrElse(v1, Iterable())
        } yield (s"$k-$v2", 1))

    joined.foreach(println)

val result = joined.reduceByKey((a,b) => a+b)

I try to manage this issue by using a broadcast variable as seen from the script. If I join RDD2 (having 250K rows) with itself pairs show up in the same partitions and so less shuffle takes place so it takes 3 minutes to get the results. However, when applying RDD1 vs. RDD2 the pairs are scattered through partitions resulting in very expensive shuffling procedure and it always ends up giving

ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 168591 ms error.

Based on my results:

PSEUDO DATA SAMPLE:

KEY VALUE
1   13894
1   17376
1   15688
1   22434
1   2282
1   14970
1   11549
1   26027
1   2895
1   15052
1   20815
2   9782
2   3393
2   11783
2   22737
2   12102
2   10947
2   24343
2   28620
2   2486
2   249
2   3271
2   30963
2   30532
2   2895
2   13894
2   874
2   2021
3   6720
3   3402
3   25894
3   1290
3   21395
3   21137
3   18739
...

A SMALL EXAMPLE

RDD1

2   1
2   2
2   3
2   4
2   5
2   6
3   1
3   6
3   7
3   8
3   9
4   3
4   4
4   5
4   6

RDD2

21  1
21  2
21  5
21  11
21  12
21  10
22  7
22  8
22  13
22  9
22  11

BASED ON THIS DATA JOIN RESULTS:

(3-22,1)
(2-21,1)
(3-22,1)
(2-21,1)
(3-22,1)
(4-21,1)
(2-21,1)
(3-21,1)
(3-22,1)
(3-22,1)
(2-21,1)
(3-22,1)
(2-21,1)
(4-21,1)
(2-21,1)
(3-21,1)

REDUCEBYKEY RESULTS:

(4-21,1)
(3-21,1)
(2-21,3)
(3-22,3)

Upvotes: 0

Views: 1273

Answers (1)

Rohan Aletty
Rohan Aletty

Reputation: 2442

Have you looked at using a cartesian join? You could maybe try something like below:

val rdd1 = sc.parallelize(for { x <- 1 to 3; y <- 1 to 5 } yield (x, y)) // sample RDD
val rdd2 = sc.parallelize(for { x <- 1 to 3; y <- 3 to 7 } yield (x, y)) // sample RDD with slightly displaced values from the first

val g1 = rdd1.groupByKey()
val g2 = rdd2.groupByKey()

val cart = g1.cartesian(g2).map { case ((key1, values1), (key2, values2)) => 
             ((key1, key2), (values1.toSet & values2.toSet).size) 
           }

When I try running the above example in a cluster, I see the following:

scala> rdd1.take(5).foreach(println)
...
(1,1)
(1,2)
(1,3)
(1,4)
(1,5)
scala> rdd2.take(5).foreach(println)
...
(1,3)
(1,4)
(1,5)
(1,6)
(1,7)
scala> cart.take(5).foreach(println)
...
((1,1),3)
((1,2),3)
((1,3),3)
((2,1),3)
((2,2),3)

The result indicates that for (key1, key2), there are 3 matching elements between the sets. Note that the result is always 3 here since the initialized input tuples' ranges overlapped by 3 elements.

The cartesian transformation does not cause a shuffle either since it just iterates over the elements of each RDD and produces a cartesian product. You can see this by calling the toDebugString() function on an example.

scala> val carts = rdd1.cartesian(rdd2)
carts: org.apache.spark.rdd.RDD[((Int, Int), (Int, Int))] = CartesianRDD[9] at cartesian at <console>:25

scala> carts.toDebugString
res11: String =
(64) CartesianRDD[9] at cartesian at <console>:25 []
 |   ParallelCollectionRDD[1] at parallelize at <console>:21 []
 |   ParallelCollectionRDD[2] at parallelize at <console>:21 []

Upvotes: -1

Related Questions