Reputation: 383
Below is an analogic schema
Table 1 - Players (Upto 10M players per team, HUGE)
player_number - INT
team_id - bigint (indexed)
Table 2 - Team (Relatively small table which I want to broadcast)
team_id - bigInt (indexed)
team_size - INT
Use case - I want to create a join of team1 and team2 on team_id. Players' table can have a large skew on team_id. For team_id number of players can range between 10-10M.
Approach 1 - I repartition players and team on team_id and do a sort-merge join. Repartitioning players takes close to 2.5mins due to large skew, and joining takes close to 3 seconds. Hence overall time is close to 2.6mins
Approach 2 - Teams and players' tables are uniformly distributed, and I do a join without repartitioning on the same key. This takes around 20secs and hence overall time is 20secs
Question - Why does non-repartitioned join take less time than repartitioning. In sort-merge join, won't spark do an internal repartitioning on team_id; how is that different from approach 1. The skew is same for both the cases
Upvotes: 0
Views: 458
Reputation: 6998
So we would need to look at the physical plan by using .explain()
on the joined DataFrame
to be completely sure but here is my assumption:
Spark detects that Table 2 is small enough to be broadcasted to each executor. Then each worker can join the tables on its own, avoiding shuffling and reducing the overall execution time.
In Approach 1 you are forcing Spark to do a repartition, which is not necessary.
Upvotes: 3