yurgis
yurgis

Reputation: 4077

Merging equi-partitioned data frames in Spark

In Hadoop a join/merge of large equi-partitioned data sets could be done without reshuffling and reduce phase simply using map-side join with CompositeInputFormat.

Trying to figure out to do it in Spark:

val x = sc.parallelize(Seq(("D", 1), ("C", 2), ("B", 3), ("A", 4))).toDF("k", "v")
    .repartition(col("k")).cache()
val y = sc.parallelize(Seq(("F", 5), ("E", 6), ("D", 7), ("C", 8))).toDF("k", "v")
    .repartition(col("k")).cache()

val xy = x.join(y, x.col("k") === y.col("k"), "outer")

x.show()    y.show()    xy.show()

+---+---+   +---+---+   +----+----+----+----+
|  k|  v|   |  k|  v|   |   k|   v|   k|   v|
+---+---+   +---+---+   +----+----+----+----+
|  A|  6|   |  C| 12|   |   A|   4|null|null|
|  B|  5|   |  D| 11|   |   B|   3|null|null|
|  C|  4|   |  E| 10|   |   C|   2|   C|   8|
|  D|  3|   |  F|  9|   |   D|   1|   D|   7|
|  E|  2|   |  G|  8|   |null|null|   E|   6|
|  F|  1|   |  H|  7|   |null|null|   F|   5|
+---+---+   +---+---+   +----+----+----+----+

So far so good. But when I check execution plan I see "unnecessary" sorts:

xy.explain

== Physical Plan ==
SortMergeOuterJoin [k#1283], [k#1297], FullOuter, None
:- Sort [k#1283 ASC], false, 0
:  +- InMemoryColumnarTableScan [k#1283,v#1284], InMemoryRelation [k#1283,v#1284], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1283,200), None, None
+- Sort [k#1297 ASC], false, 0
   +- InMemoryColumnarTableScan [k#1297,v#1298], InMemoryRelation [k#1297,v#1298], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1297,200), None, None

Is it possible to avoid sorts here?

Edit

For the reference, Hadoop had this "feature" available since 2007: https://issues.apache.org/jira/browse/HADOOP-2085

Update

As Lezzar pointed out repartition() alone is not sufficient to achieve equi-partitioned sorted state. I think now it needs to be followed by sortWithinPartitions() So that should do the trick:

val x = sc.parallelize(Seq(("F", 1), ("E", 2), ("D", 3), ("C", 4), ("B", 5), ("A", 6))).toDF("k", "v")
    .repartition(col("k")).sortWithinPartitions(col("k")).cache()
val y = sc.parallelize(Seq(("H", 7), ("G", 8), ("F", 9), ("E",10), ("D",11), ("C",12))).toDF("k", "v")
    .repartition(col("k")).sortWithinPartitions(col("k")).cache()

xy.explain()

== Physical Plan ==
SortMergeOuterJoin [k#1055], [k#1069], FullOuter, None
:- InMemoryColumnarTableScan [k#1055,v#1056], InMemoryRelation [k#1055,v#1056], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1055 ASC], false, 0, None
+- InMemoryColumnarTableScan [k#1069,v#1070], InMemoryRelation [k#1069,v#1070], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1069 ASC], false, 0, None

No sorting anymore!

Upvotes: 3

Views: 1467

Answers (2)

Lezzar Walid
Lezzar Walid

Reputation: 148

Why are you saying unnecessary sort ? The merge join needs the data to be sorted. And in IMHO, there is not a better strategy than a merge join to perform a full outer join, except if one of your dataframes is small enough to be broadcasted

Upvotes: 1

Srinivasarao Daruna
Srinivasarao Daruna

Reputation: 3374

Similar to Map-side joining in Hadoop, Spark has the broadcast join, which transfers table data to all the workers just like what distributed cache does in hadoop mapreduce. Please refere spark Documentation or search once for spark broadcast hash join. Spark automatically takes care of it unlike the hive. So, no need to worry about it.

You will need to understand few parameters though for this.

-> spark.sql.autoBroadcastJoinThreshold, the size under which, spark broadcasts the table automatically.

You can try below code to understand broadcast join and also you can refer spark Documentation for BroadCast join or google it for bit more details.

Example code to try:

val sqlContext = new HiveContext(sc);
1) sqlContext.sql("CREATE TABLE IF NOT EXISTS tab3 (key INT, value STRING)")

2) sqlContext.sql("INSERT INTO tab4 select 1,\"srini\" from sr23");
(I have created other table to just insert a record into table. As hive only support insert into select, i have used this trick to have some data.) You can skip this step as well, as you just want to see the physical plan.

------ You can also use any Hive table that is already created instead.. I am just trying to simulate the hive table thats it. --- 

3) val srini_df1 = sqlContext.sql("ANALYZE TABLE tab4 COMPUTE STATISTICS NOSCAN");

4) val df2 = sc.parallelize(Seq((5,"F"), (6,"E"), (7,"sri"), (1,"test"))).toDF("key", "value")

5) val join_df = sqlContext.sql("SELECT * FROM tab5").join(df2,"key");

6) join_df.explain
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(530360) called with curMem=238151, maxMem=555755765
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 517.9 KB, free 529.3 MB)
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(42660) called with curMem=768511, maxMem=555755765
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 41.7 KB, free 529.2 MB)
16/03/15 22:40:09 INFO storage.BlockManagerInfo: Added broadcast_23_piece0 in memory on localhost:63721 (size: 41.7 KB, free: 529.9 MB)
16/03/15 22:40:09 INFO spark.SparkContext: Created broadcast 23 from explain at <console>:28
== Physical Plan ==
Project [key#374,value#375,value#370]
 BroadcastHashJoin [key#374], [key#369], BuildLeft
  HiveTableScan [key#374,value#375], (MetastoreRelation default, tab5, None)
  Project [_1#367 AS key#369,_2#368 AS value#370]
   Scan PhysicalRDD[_1#367,_2#368]

Upvotes: 0

Related Questions