Tokyo
Tokyo

Reputation: 823

Update a column in a dataframe, based on the values in another dataframe

I have two dataframes, df1 and df2:

df1.show()
+---+--------+-----+----+--------+
|cA |   cB   |  cC | cD |   cE   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   0    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+


df2.show()
+---+--------+-----+----+--------+
|cA |   cB   |  cH | cI |   cJ   |
+---+--------+-----+----+--------+
|  A|   abc  | a   | b  |   ?    |
|  C|   ghi  | a   | c  |   ?    |
+---+--------+-----+----+--------+

I would like to update cE column in df1 and set it to 1, if the row is referenced in df2. Each record is identified by cA and cB columns.

Below is the desired output; Note that the cE value of the first record was updated to 1:

+---+--------+-----+----+--------+
|cA |   cB   |  cC | cD |   cE   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   1    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+

Upvotes: 4

Views: 9577

Answers (4)

Prathik Kini
Prathik Kini

Reputation: 1698

When there is scenario of updating a column value based on another column, then the when clause comes handy. Please Refer the when and otherwise clause.

import pyspark.sql.functions as F
df3=df1.join(df2,(df1.cA==df2.cA)&(df1.cB==df2.cB),"full").withColumn('cE',F.when((df1.cA==df2.cA)&(df1.cB==df2.cB),1).otherwise(0)).select(df1.cA,df1.cB,df1.cC,df1.cD,'cE')
df3.show()
+---+---+----+---+---+
| cA| cB|  cC| cD| cE|
+---+---+----+---+---+
|  E|mno| 0.1|0.1|  0|
|  B|def|0.15|0.5|  0|
|  C|ghi| 0.2|0.2|  1|
|  A|abc| 0.1|0.0|  1|
|  D|jkl| 1.1|0.1|  0|
+---+---+----+---+---+

Upvotes: 5

Vladislav Varslavans
Vladislav Varslavans

Reputation: 2934

Here is my answer.

It's scala code - sorry for that - i don't have python installed. Hopefully that helps.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val ss = SparkSession.builder().master("local").getOrCreate()

import ss.implicits._

val seq1 = Seq(
  ("A", "abc", 0.1, 0.0, 0),
  ("B", "def", 0.15, 0.5, 0),
  ("C", "ghi", 0.2, 0.2, 1),
  ("D", "jkl", 1.1, 0.1, 0),
  ("E", "mno", 0.1, 0.1, 0)
)

val seq2 = Seq(
  ("A", "abc", "a", "b", "?"),
  ("C", "ghi", "a", "c", "?")
)


val df1 = ss.sparkContext.makeRDD(seq1).toDF("cA", "cB", "cC", "cD", "cE")
val df2 = ss.sparkContext.makeRDD(seq2).toDF("cA", "cB", "cH", "cI", "cJ")


val joined = df1.join(df2, (df1("cA") === df2("cA")).and(df1("cB") === df2("cB")), "left")

val res = joined.withColumn("newCe",
  when(df2("cA").isNull.and(joined("cE") === lit(0)), lit(0)).otherwise(lit(1)))


res.select(df1("cA"), df1("cB"), df1("cC"), df1("cD"), res("newCe"))
  .withColumnRenamed("newCe", "cE")
  .show

And the output for me is:

+---+---+----+---+---+
| cA| cB|  cC| cD| cE|
+---+---+----+---+---+
|  E|mno| 0.1|0.1|  0|
|  B|def|0.15|0.5|  0|
|  C|ghi| 0.2|0.2|  1|
|  A|abc| 0.1|0.0|  1|
|  D|jkl| 1.1|0.1|  0|
+---+---+----+---+---+

Upvotes: 3

wangtianye
wangtianye

Reputation: 306

try this

for i in df2.values:
    df1.loc[(df1.cA==i[0]) & (df1.cB == i[1]),['cE']] = 1

Upvotes: 0

Simon Delecourt
Simon Delecourt

Reputation: 1599

Using join you can do what you want :

df1 = pd.DataFrame({ 'cA' : ['A', 'B', 'C', 'D', 'E'], 'cB' : ['abc', 'def', 'ghi', 'jkl', 'mno'], 'cE' : [0,0,1, 0, 0]})
df2 = pd.DataFrame({ 'cA' : ['A', 'C'], 'cB' : ['abc', 'ghi'], 'cE' : ['?','?']})

# join
df = df1.join(df2.set_index(['cA', 'cB']),  lsuffix='_df1', rsuffix='_df2', on=['cA', 'cB'])

# nan values indicates rows that are not present in both dataframes
df.loc[~df['cE_df2'].isna(), 'cE_df2'] = 1
df.loc[df['cE_df2'].isna(), 'cE_df2'] = 0

df1['cE'] = df['cE_df2']

Output :

    cA  cB  cE
0   A   abc 1
1   B   def 0
2   C   ghi 1
3   D   jkl 0
4   E   mno 0

Upvotes: 0

Related Questions