user1119283
user1119283

Reputation: 431

How to add or update a column value using joins in pyspark

Need to find how can we update a column in a dataframe say df based on equality condition on another dataframe say ds. In order to reproduce this issue, you can just copy paste the same to your notebook.

Example: df is created as

data = [["101", "sravan", "vignan"],
        ["102", "ramya", "vvit"],
        ["103", "rohith", "klu"],
        ["104", "sridevi", "vignan"],
        ["105", "gnanesh", "iit"]]
columns = ['rollNo', 'name', 'lastName']
df = spark.createDataFrame(data=data, schema=columns, verifySchema=True)

another data frame is ds:

ds = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("107", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("108", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("109", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
    ],
    ["rollNo", "creation_date", "last_update_time"]
)

Now we perform join(leftanti) join to create dt to find non-common items on the left table:

dt = df.join(ds, ["rollNo"], "leftanti")
dt.show(5,False)
df.show(5,False)

response of leftanti join dt dataframe is like below

+------+-------+--------+
|rollNo|name   |lastName|
+------+-------+--------+
|104   |sridevi|vignan  |
|105   |gnanesh|iit     |
+------+-------+--------+



original df dataframe is

+------+-------+--------+
|rollNo|name   |lastName|
+------+-------+--------+
|101   |sravan |vignan  |
|102   |ramya  |vvit    |
|103   |rohith |klu     |
|104   |sridevi|vignan  |
|105   |gnanesh|iit     |
+------+-------+--------+

Issue is when trying to add another column to df based on an equality condition on both dataframes as mentioned below, it isn't giving expected response, which is to update only those rows which has same rollNo to true and rest to false:

df.withColumn('is_deleted', when(dt.rollNo == df.rollNo, True).otherwise(False)).show(5,False) 

Response received:
+------+-------+--------+----------+
|rollNo|name   |lastName|is_deleted|
+------+-------+--------+----------+
|101   |sravan |vignan  |true      |
|102   |ramya  |vvit    |true      |
|103   |rohith |klu     |true      |
|104   |sridevi|vignan  |true      |
|105   |gnanesh|iit     |true      |
+------+-------+--------+----------+

Response expected:
+------+-------+--------+----------+
|rollNo|name   |lastName|is_deleted|
+------+-------+--------+----------+
|101   |sravan |vignan  |false     |
|102   |ramya  |vvit    |false     |
|103   |rohith |klu     |false     |
|104   |sridevi|vignan  |true      | # this shud be updated to true
|105   |gnanesh|iit     |true      | # this shud be updated to true
+------+-------+--------+----------+

Upvotes: 0

Views: 1834

Answers (2)

Usergtwuser
Usergtwuser

Reputation: 13

We can create a temp list and then use isin method to filter out rows. Something like below:

rolls = []           
dt = df.join(ds, ["rollNo"], "leftanti")
dataCollect = dt.collect()
for row in dataCollect:
    print(row['rollNo']) 
    rolls.append(row['rollNo'])
print(rolls)
dt.show(5,False)
df.show(5,False)
df.withColumn('is_deleted', when(df.rollNo.isin(rolls), True).otherwise(False)).show(5,False)

Upvotes: 1

wwnde
wwnde

Reputation: 26676

Use left_outer instead. Code below

 dt = df.join(ds.select('rollNo',col('creation_date').alias('is_deleted')), ["rollNo"], "left_outer").withColumn('is_deleted',when(col('is_deleted').isNull(),True).otherwise(False))
    dt.show(truncate=False)

+------+-------+--------+----------+
|rollNo|name   |lastName|is_deleted|
+------+-------+--------+----------+
|101   |sravan |vignan  |false     |
|102   |ramya  |vvit    |false     |
|103   |rohith |klu     |false     |
|104   |sridevi|vignan  |true      |
|105   |gnanesh|iit     |true      |
+------+-------+--------+----------+

Upvotes: 1

Related Questions