USB
USB

Reputation: 6139

How to update Spark DataFrame Column Values of a table from another table based on a condition using Pyspark

I would like to compare 2 dataframes in pyspark.

Below is my test case dataset (from google).

So I have 2 df's

  1. Base DF
  2. Secondary DF

baseDF

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,IT,2/11/2019
22,Tom,2000,usa,HR,2/11/2019
33,Kom,3500,uk,IT,2/11/2019
44,Nom,4000,can,HR,2/11/2019
55,Vom,5000,mex,IT,2/11/2019
66,XYZ,5000,mex,IT,2/11/2019

secDF

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,ITA,2/11/2019
22,Tom,2500,usa,HRA,2/11/2019
33,Kom,3000,uk,ITA,2/11/2019
44,Nom,4600,can,HRA,2/11/2019
55,Vom,8000,mex,ITA,2/11/2019
77,XYZ,5000,mex,ITA,2/11/2019

I have to compare secDF and baseDF with 2 keys (No and Name), if those fields match (I only need the matched records from secDF)then I have to update the salary and Dept field of baseDF with the value from secDF

Expected output

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,ITA,2/11/2019
22,Tom,2500,usa,HRA,2/11/2019
33,Kom,3000,uk,ITA,2/11/2019
44,Nom,4600,can,HRA,2/11/2019
55,Vom,8000,mex,ITA,2/11/2019
66,XYZ,5000,mex,IT,2/11/2019

Using pyspark I can use subtract() to find the values of table1 not present in table2, and consequently use unionAll of the two tables or should I use withcolumn to overwrite values satisfying the condition.

Could someone suggest a good way of doing this?

Update --- I have to compare secDF and baseDF with 2 keys (No and Name), if those fields match (I only need the matched records from secDF)then I have to update the salary and Dept field of baseDF with the value from secDF.

Upvotes: 0

Views: 2909

Answers (1)

mck
mck

Reputation: 42422

You can do a left join and coalesce the resulting Sal column, with secdf taking precedence over basedf:

import pyspark.sql.functions as F

result = basedf.alias('basedf').join(
    secdf.alias('secdf'),
    ['No', 'Name'],
    'left'
).select(
    [F.coalesce('secdf.Sal', 'basedf.Sal').alias('Sal')
     if c == 'Sal'
     else F.coalesce('secdf.Dept', 'basedf.Dept').alias('Dept')
     if c == 'Dept'
     else f'basedf.{c}'
     for c in basedf.columns]
)

result.show()
+---+----+----+-------+----+---------+
| No|Name| Sal|Address|Dept|Join_Date|
+---+----+----+-------+----+---------+
| 11| Sam|1000|    ind| ITA|2/11/2019|
| 22| Tom|2500|    usa| HRA|2/11/2019|
| 33| Kom|3000|     uk| ITA|2/11/2019|
| 44| Nom|4600|    can| HRA|2/11/2019|
| 55| Vom|8000|    mex| ITA|2/11/2019|
| 66| XYZ|5000|    mex|  IT|2/11/2019|
+---+----+----+-------+----+---------+

Upvotes: 1

Related Questions