Reputation: 431
Need to find how can we update a column in a dataframe say df
based on equality condition on another dataframe say ds
.
In order to reproduce this issue, you can just copy paste the same to your notebook.
Example: df is created as
data = [["101", "sravan", "vignan"],
["102", "ramya", "vvit"],
["103", "rohith", "klu"],
["104", "sridevi", "vignan"],
["105", "gnanesh", "iit"]]
columns = ['rollNo', 'name', 'lastName']
df = spark.createDataFrame(data=data, schema=columns, verifySchema=True)
another data frame is ds
:
ds = spark.createDataFrame(
[
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("107", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("108", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("109", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
],
["rollNo", "creation_date", "last_update_time"]
)
Now we perform join(leftanti) join to create dt
to find non-common items on the left table:
dt = df.join(ds, ["rollNo"], "leftanti")
dt.show(5,False)
df.show(5,False)
response of leftanti join dt dataframe is like below
+------+-------+--------+
|rollNo|name |lastName|
+------+-------+--------+
|104 |sridevi|vignan |
|105 |gnanesh|iit |
+------+-------+--------+
original df dataframe is
+------+-------+--------+
|rollNo|name |lastName|
+------+-------+--------+
|101 |sravan |vignan |
|102 |ramya |vvit |
|103 |rohith |klu |
|104 |sridevi|vignan |
|105 |gnanesh|iit |
+------+-------+--------+
Issue is when trying to add another column to df
based on an equality condition on both dataframes as mentioned below, it isn't giving expected response, which is to update only those rows which has same rollNo
to true
and rest to false
:
df.withColumn('is_deleted', when(dt.rollNo == df.rollNo, True).otherwise(False)).show(5,False)
Response received:
+------+-------+--------+----------+
|rollNo|name |lastName|is_deleted|
+------+-------+--------+----------+
|101 |sravan |vignan |true |
|102 |ramya |vvit |true |
|103 |rohith |klu |true |
|104 |sridevi|vignan |true |
|105 |gnanesh|iit |true |
+------+-------+--------+----------+
Response expected:
+------+-------+--------+----------+
|rollNo|name |lastName|is_deleted|
+------+-------+--------+----------+
|101 |sravan |vignan |false |
|102 |ramya |vvit |false |
|103 |rohith |klu |false |
|104 |sridevi|vignan |true | # this shud be updated to true
|105 |gnanesh|iit |true | # this shud be updated to true
+------+-------+--------+----------+
Upvotes: 0
Views: 1834
Reputation: 13
We can create a temp list and then use isin
method to filter out rows. Something like below:
rolls = []
dt = df.join(ds, ["rollNo"], "leftanti")
dataCollect = dt.collect()
for row in dataCollect:
print(row['rollNo'])
rolls.append(row['rollNo'])
print(rolls)
dt.show(5,False)
df.show(5,False)
df.withColumn('is_deleted', when(df.rollNo.isin(rolls), True).otherwise(False)).show(5,False)
Upvotes: 1
Reputation: 26676
Use left_outer instead. Code below
dt = df.join(ds.select('rollNo',col('creation_date').alias('is_deleted')), ["rollNo"], "left_outer").withColumn('is_deleted',when(col('is_deleted').isNull(),True).otherwise(False))
dt.show(truncate=False)
+------+-------+--------+----------+
|rollNo|name |lastName|is_deleted|
+------+-------+--------+----------+
|101 |sravan |vignan |false |
|102 |ramya |vvit |false |
|103 |rohith |klu |false |
|104 |sridevi|vignan |true |
|105 |gnanesh|iit |true |
+------+-------+--------+----------+
Upvotes: 1