Tokyo
Tokyo

Reputation: 823

Update Spark DataFrame based on values of another Spark Dataframe

I have two dataframes, df1 and df2 as shown below:

df1.show()
+---+--------+-----+----+--------+
|c1 |   c2   |  c3 | c4 |   c5   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   0    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+


df2.show()
+---+--------+-----+----+--------+
|c1 |   c2   |  c3 | c4 |   c5   |
+---+--------+-----+----+--------+
|  A|   abc  | a   | b  |   ?    |
|  C|   ghi  | a   | c  |   ?    |
+---+--------+-----+----+--------+

I would like to update c5 column in df1 and set it to 1, if the row is referenced in df2. Each record is identified by c1 and c2 columns.

Below is the desired output; Note that the c5 value of the first record was updated to 1:

+---+--------+-----+----+--------+
|c1 |   c2   |  c3 | c4 |   c5   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   1    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+

Upvotes: 1

Views: 112

Answers (1)

Vamsi Prabhala
Vamsi Prabhala

Reputation: 49260

Left join df2 on to df1 and use a case when .. expression for c5.

from pyspark.sql.functions import when,*
joined_dfs = df1.join(df2,(df1.c1 == df2.c1) & (df1.c2 == df2.c2),'left').select('df1.*')
joined_dfs.select(joined_dfs.c1,joined_dfs.c2,joined_dfs.c3,joined_dfs.c4) \
          .withColumn('c5',when((joined_dfs.c1.isNotNull()) & (joined_dfs.c2.isNotNull()),1).otherwise(0)) \ 
          .show()

Upvotes: 2

Related Questions