raju kancharla
raju kancharla

Reputation: 49

Pyspark Update, Insert records on LargeData parquet file

I have 70M+ Records(116MB) in my Data example columns ID, TransactionDate, CreationDate

Here ID is primary Key column. I need to Update my data with New upcoming Parquet files data which is of size <50MB.

Sample Input

ID col1      col2
1 2021-01-01 2020-08-21
2 2021-02-02 2020-08-21

New Data

ID col1      col2
1 2021-02-01 2020-08-21
3 2021-02-02 2020-08-21

Output Rows of Data

1 2021-02-01 2020-08-21 (Updated)
3 2021-02-02 2020-08-21 (Inserted)
2 2021-02-02 2020-08-21 (Remains Same)

I have tried with Various approaches But none of them giving proper results with Less Shuffle Read & Write and Execution Time.

Few of my approaches.

  1. Inner Join(Update Records), Left-Anti(Insert Records), Left-Anti(Remains Same records ) Joins Taking 10Minutes to execute with 9.5GB Shuffle Read and 9.5 GB shuffle right.

I tried with some partitionBy on creationDate approach but unable to get how to read New data with appropriate partition.

Help me with better approach that takes less time. With less shuffle read and write in Pyspark

Thanks in Advance.

Upvotes: 2

Views: 1710

Answers (1)

Vincent Doba
Vincent Doba

Reputation: 5068

You cannot avoid some shuffle, but at least you can limit it by doing only one full outer join instead of one inner join and two anti joins.

You first add a new column updated to your new dataframe, to determine if joined row is updated or not, then you perform your full outer join, and finally you select value for each column from new or old data according to updated column. Code as follow, with old_data dataframe as current data and new_data dataframe as updated data:

from pyspark.sql import functions as F

join_columns = ['ID']

final_data = new_data \
  .withColumn('updated', F.lit(True)) \
  .join(old_data, join_columns, 'full_outer') \
  .select(
    [F.col(c) for c in join_columns] +
    [F.when(F.col('updated'), new_data[c]).otherwise(old_data[c]).alias(c) for c in old_data.columns if c not in join_columns]
  )

If you look at execution plan using .explain() method on final_data dataframe, you can see that you have only two shuffles (the Exchange step), one per joined dataframe:

== Physical Plan ==
*(5) Project [coalesce(ID#6L, ID#0L) AS ID#17L, CASE WHEN exists#12 THEN col1#7 ELSE col1#1 END AS col1#24, CASE WHEN exists#12 THEN col2#8 ELSE col2#2 END AS col2#25]
+- SortMergeJoin [ID#6L], [ID#0L], FullOuter
   :- *(2) Sort [ID#6L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(ID#6L, 200), ENSURE_REQUIREMENTS, [id=#27]
   :     +- *(1) Project [ID#6L, col1#7, col2#8, true AS exists#12]
   :        +- *(1) Scan ExistingRDD[ID#6L,col1#7,col2#8]
   +- *(4) Sort [ID#0L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(ID#0L, 200), ENSURE_REQUIREMENTS, [id=#32]
         +- *(3) Scan ExistingRDD[ID#0L,col1#1,col2#2]

If you look at your one inner join and two anti join execution plan, you get six shuffles:

== Physical Plan ==
Union
:- *(5) Project [ID#0L, col1#7, col2#8]
:  +- *(5) SortMergeJoin [ID#0L], [ID#6L], Inner
:     :- *(2) Sort [ID#0L ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(ID#0L, 200), ENSURE_REQUIREMENTS, [id=#177]
:     :     +- *(1) Project [ID#0L]
:     :        +- *(1) Filter isnotnull(ID#0L)
:     :           +- *(1) Scan ExistingRDD[ID#0L,col1#1,col2#2]
:     +- *(4) Sort [ID#6L ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(ID#6L, 200), ENSURE_REQUIREMENTS, [id=#183]
:           +- *(3) Filter isnotnull(ID#6L)
:              +- *(3) Scan ExistingRDD[ID#6L,col1#7,col2#8]
:- SortMergeJoin [ID#0L], [ID#6L], LeftAnti
:  :- *(7) Sort [ID#0L ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(ID#0L, 200), ENSURE_REQUIREMENTS, [id=#192]
:  :     +- *(6) Scan ExistingRDD[ID#0L,col1#1,col2#2]
:  +- *(9) Sort [ID#6L ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(ID#6L, 200), ENSURE_REQUIREMENTS, [id=#197]
:        +- *(8) Project [ID#6L]
:           +- *(8) Filter isnotnull(ID#6L)
:              +- *(8) Scan ExistingRDD[ID#6L,col1#7,col2#8]
+- SortMergeJoin [ID#6L], [ID#0L], LeftAnti
   :- *(11) Sort [ID#6L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(ID#6L, 200), ENSURE_REQUIREMENTS, [id=#203]
   :     +- *(10) Scan ExistingRDD[ID#6L,col1#7,col2#8]
   +- *(13) Sort [ID#0L ASC NULLS FIRST], false, 0
      +- ReusedExchange [ID#0L], Exchange hashpartitioning(ID#0L, 200), ENSURE_REQUIREMENTS, [id=#177]

Upvotes: 2

Related Questions