Julias
Julias

Reputation: 5892

Joining two clustered tables in spark dataset seems to end up with full shuffle

I have two hive clustered tables t1 and t2

CREATE EXTERNAL TABLE `t1`(
  `t1_req_id` string,
   ...
PARTITIONED BY (`t1_stats_date` string)
CLUSTERED BY (t1_req_id) INTO 1000 BUCKETS
// t2 looks similar with same amount of buckets

The insert part happens in hive

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table `t1` partition(t1_stats_date,t1_stats_hour)
   select *
   from t1_raw
   where t1_stats_date='2020-05-10' and t1_stats_hour='12' AND 
   t1_req_id is not null

The code looks like as following:

 val t1 = spark.table("t1").as[T1]
 val t2=  spark.table("t2").as[T2]
 val outDS = t1.joinWith(t2, t1("t1_req_id) === t2("t2_req_id), "fullouter")
  .map { case (t1Obj, t2Obj) =>
    val t3:T3 = // do some logic
    t3 
  }
 outDS.toDF.write....

I see projection in DAG - but it seems that the job still does full data shuffle Also, while looking into the logs of executor I don't see it reads the same bucket of the two tables in one chunk - that what I would expect to find

There are spark.sql.sources.bucketing.enabled, spark.sessionState.conf.bucketingEnabled and spark.sql.join.preferSortMergeJoin flags

What am I missing? and why is there still full shuffle, if there are bucketed tables? The current spark version is 2.3.1

enter image description here enter image description here

Upvotes: 3

Views: 483

Answers (1)

Alexander Pivovarov
Alexander Pivovarov

Reputation: 4990

One possibility here to check for is if you have a type mismatch. E.g. if the type of the join column is string in T1 and BIGINT in T2. Even if the types are both integer (e.g. one is INT, another BIGINT) Spark will still add shuffle here because different types use different hash functions for bucketing.

Upvotes: 1

Related Questions