Reputation: 691
I am writing application using Spark dataset API on databricks notebook.
I have 2 tables. One is 1.5billion rows and second 2.5 million. Both tables contain telecommunication data and join is done using country code and first 5 digits of a number. Output has 55 billion rows. Problem is I have skewed data(long running tasks). No matter how i repartition dataset I get long running tasks because of uneven distribution of hashed keys.
I tried using broadcast joins, tried persisting big table partitions in memory etc.....
What are my options here?
Upvotes: 4
Views: 3024
Reputation: 4127
spark will repartition the data based on the join key, so repartitioning before the join won't change the skew (only add an unnecessary shuffle)
if you know the key that is causing the skew (usually it will be some thing like null or 0 or ""), split your data into to 2 parts - 1 dataset with the skew key, and another with the rest
and do the join on the sub datasets, and union the results
for example:
val df1 = ...
val df2 = ...
val skewKey = null
val df1Skew = df1.where($"key" === skewKey)
val df2Skew = df2.where($"key" === skewKey)
val df1NonSkew = df1.where($"key" =!= skewKey)
val df2NonSkew = df2.where($"key" =!= skewKey)
val dfSkew = df1Skew.join(df2Skew) //this is a cross join
val dfNonSkew = df1NonSkew.join(df2NonSkew, "key")
val res = dfSkew.union(dfNonSkew)
Upvotes: 5