Naveen Thimmappa
Naveen Thimmappa

Reputation: 21

Compare two Dataframe and run "Update Else Insert" in Pyspark

I am new to Python and couldn't find the exact answers that i am looking for in other blogs and hence posting it as a new question.

I have two dataframes created as below

df_hive.show() 
+--------+----------+-------+ 
|BATCH_ID|SRC_SYS_ID|ACT_IND| 
+--------+----------+-------+ 
|     100|      SYS1|      N| 
|     101|      SYS2|      N| 
|     102|      SYS3|      N| 
|     103|      SYS4|      Y| 
+--------+----------+-------+

df_orc.show() 
+--------+----------+-------+ 
|BATCH_ID|SRC_SYS_ID|ACT_IND| 
+--------+----------+-------+ 
|      99|      SYS0|      N|
|     100|      SYS1|      N| 
|     101|      SYS2|      N| 
|     102|      SYS3|      Y| 
+--------+----------+-------+

Expected Results for "df_orc"

+--------+----------+-------+ 
|BATCH_ID|SRC_SYS_ID|ACT_IND| 
+--------+----------+-------+ 
|      99|      SYS0|      N|
|     100|      SYS1|      N| 
|     101|      SYS2|      N| 
|     102|      SYS3|      N| 
|     103|      SYS4|      Y| 
+--------+----------+-------+

what I am trying to achieve is join two dataframe based on BATCH_ID and SRC_SYS_ID and if matched (and ACT_IND is different), update the ACT_IND in the second dataframe "df_orc" and if no match found, then insert it as new record into "df_orc".

Appreciate your help in Advance

Upvotes: 1

Views: 299

Answers (1)

Venu N
Venu N

Reputation: 11

valuesA = [(100,'SYS1','N'),(101,'SYS2','N'),(102,'SYS3','N'),(103,'SYS4','Y')]
TableA = spark.createDataFrame(valuesA,['BATCH_ID','SRC_SYS_ID','ACT_IND'])

valuesB = [(99,'SYS0','N'),(100,'SYS1','N'),(101,'SYS2','N'),(102,'SYS3','Y')]
TableB = spark.createDataFrame(valuesB,['BATCH_ID','SRC_SYS_ID','ACT_IND'])

ta = TableA.alias('ta')
tb = TableB.alias('tb')

diff = (ta.select('BATCH_ID')
        .subtract(tb.select('BATCH_ID'))
        .rdd.map(lambda x: x[0]).collect())

tb.unionAll(ta[ta.BATCH_ID.isin(diff)]).orderBy('BATCH_ID').show()

Upvotes: 1

Related Questions