Reputation: 5526
I have a large spark dataframe which is around 25 GB in size which I have to join with another dataframe with about 15 GB in size.
Now when I run the code it is taking around 15 minutes to complete
Resource allocation is 40 executors with 128 GB memory each
When I went through its execution plan, the sort merge join was being performed.
The problem is:
The join is performed around 5 to 6 times on same key but different tables because of that it was taking most of the time sorting the data and co-locating the partitions before merging/joining the data for every join performed.
So is there any way to sort the data before performing the join so that the sort operation is not performed for each join or optimized in such a way that it takes less time sorting and more time actually joining the data?
I just want to sort my dataframe before performing the join but not sure how to do it?
For example:
If my dataframe is joining on id column
joined_df = df1.join(df2,df1.id==df2.id)
How can I sort the dataframe based on 'id' before joining so that the partitions are co-located?
Upvotes: 5
Views: 3908
Reputation: 1455
I've had good results in the past by repartitioning the input dataframes by the join column. While this doesn't avoid a shuffle, it does make the shuffle explicit, allowing you to choose the number of partitions specifically for the join (as opposed to setting spark.sql.shuffle.partitions
which will apply to all joins).
Bucketing is a useful technique if you need to read a dataset multiple times over multiple jobs, when the cost of writing out to persistent storage pays off.
Upvotes: 3
Reputation: 74619
So is there any way to sort the data before performing the join so that the sort operation is not performed for each join or optimized in such a way that it takes less time sorting and more time actually joining the data?
That smells like bucketing.
Bucketing is an optimization technique that uses buckets (and bucketing columns) to determine data partitioning and avoid data shuffle.
The idea is to bucketBy
the datasets so Spark knows that keys are co-located (pre-shuffled already). The number of buckets and the bucketing columns have to be the same across DataFrames participating in join.
Please note that this is supported for Hive or Spark tables (saveAsTable
) as the bucket metadata is fetched from a metastore (Spark's or Hive's).
Upvotes: 4