Reputation: 115
I'm trying to dedupe some large skewed data, of uids, timestamps, and another set of session_ids. The second set of IDs somewhat low cardinality. I'm trying to dedupe the Spark DF across these 3 columns. There some high level skewness as these are observation data, and on many days we have alot more observations while others have much less, which seems to impact performance. I ask generally what is the best way to approach this?
I've tried drop_duplicates(< 3 columns here>), I've tried row_number (currently what I'm still using), I've tried an agg group by, but I'm not sure that will work as this DF has 3 additional columns that I would like to retain.
Appreciate any advice!
Upvotes: 1
Views: 45
Reputation: 38335
Spark partitions data according to partition by
specified in row_number()
, if the data is skewed, some partitions are very big and do not fit in memory.
The solution is to apply two stage row_number deduplication: first using seeding partition with some random value and second step - row_number without random seed.
Example:
This is skewed and runs very slow
df1= spark.sql('select * from
(
select t.*,
row_number() over(partition by col1, col2 order by col3) rn
from mytable t) s where rn=1')
Solution:
Add FLOOR(RAND()*100)%20
to the partition by, it will produce random numbers and divide partitions by additionally 20 parts
then do deduplication.
df1= spark.sql('select col1, col2, coln from
(
select t.*,
row_number() over(partition by col1, col2, FLOOR(RAND()*100)%20 order by col3) rn
from mytable t) s where rn=1')
After that deduplicate again without random number, you will get up to 20 duplicated rows with the same key in skewed partition and it will not create much load
df1.createOrReplaceTempView("stage1")
df2=spark.sql("select col1, col2, coln from
(
select t.*,
row_number() over(partition by col1, col2 order by col3) rn
from stage1 t) s where rn=1")
This will work fast
Upvotes: 0