hyperc54
hyperc54

Reputation: 315

Spark >2 - Custom partitioning key during join operation

I am wondering whether we can force Spark to use a custom partitioning key during a join operation with two dataframes.

For example, let's consider

df1: DataFrame - [groupid, other_column_a]
df2: DataFrame - [groupid, other_column_b]

If I run

df_join = df1.join(df2, "groupid")

Spark will set "groupid" as a partition key and performs the join on each partition. Problem is, this can runs out of memory on a machine if the partition is too big.

However, it seems theoretically possible to perform the join with say (groupid, other_column_a) as the partitioning key (to reduce the size of each partition).

Is it possible to do it with Spark ? I tried to do df1.repartition("group_id","other_column_a") upfront but this is overriden by the join (I check it with df_join.explain()). I can't find any resource online that explains how to do this.

Thanks!

Visual explanation

Upvotes: 3

Views: 1132

Answers (2)

vikrant rana
vikrant rana

Reputation: 4649

If you are joining on some integer column I'd, you can partition your dataframe , with I'd modulo some number.. ie how many partitions you want.. this way the id which shares some common hash value will be grouped together in one partition.. then you can perform your join by breaking down in multiple joins.. joining each partition serially in a loop.. I have explained this case in detail.. Efficient pyspark join

Upvotes: 3

user10722654
user10722654

Reputation: 11

However, it seems theoretically possible to perform the join with say (groupid, other_column_a)

That's not correct. To perform join Spark has to move all records with groupid to a single partition, therefore using (groupid, other_column_a) would possible if:

  • (groupid, other_column_a) where join keys.
  • There was a functional relationship between other_column_a and group_id.

The first condition is clearly not satisfied, as you join only by the groupid, the second wouldn't resolve the problem as distribution would be the same or worse.

There are other possible solutions for skewed joins like separate handling of skewed groups or iterative broadcast joins (see answer and comments in Spark final task takes 100x times longer than first 199, how to improve).

Upvotes: 1

Related Questions