Sajal
Sajal

Reputation: 89

Fill a column in pyspark dataframe, by comparing the data between two different columns in the same dataframe

Aim: To implement this query

select *, 
       case when new_x != x or new_y != y then 'some_status_change' else cdc_status end as cdc_status
       from dataframe where cdc_status = 'noUpdateRequired'

I am trying to implement this logic using pyspark(3.0.0) and spark(2.4.4) and I currently have this

df = df.withColumn("cdc_status",
                           F.when(((F.col('cdc_status') == 'noUpdateRequired')
                                  & (F.col('new_autoapproveind') != F.col('autoapproveind')
                                  | F.col('new_preferpathway') != F.col('preferpathway'))), 'pathwayChange'))

But this is throwing me the following error

ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions

So basically I need a solution where I am able update the column cdc_status, where new_x != x or new_y != y where cdc_status = 'noUpdateRequired'

df.printSchema()

root

 |-- new_autoapproveind: string (nullable = true)
 |-- new_preferpathway: string (nullable = true)
 |-- autoapproveind: string (nullable = true)
 |-- preferpathway: string (nullable = true)
 |-- cdc_status: string (nullable = true)

I have removed some columns while printing the schema because they are sensitive in nature but essentially they also are all string based columns.

I have tried searching everywhere, but not able to find solution for same in pyspark. scala has =!= operator, but the same is not there pyspark.

Although I am able to use when function in other cases, but there it is F.col('cdc_status') != 'some value') where it is a static value, but here I need to comparison between columns, and then fill up / update value in cdc_status column.

Any help will be greatly appreciated!

Upvotes: 0

Views: 856

Answers (1)

mck
mck

Reputation: 42392

You're missing brackets in your conditions, which led to the error. That said, your Python code probably isn't equivalent to the SQL query. You should do a filter before adding the new column, and you should also add an otherwise clause. e.g.

import pyspark.sql.functions as F

df2 = df.filter("cdc_status = 'noUpdateRequired'").withColumn(
    'cdc_status',
    F.when(
        (F.col('new_autoapproveind') != F.col('autoapproveind'))
        | 
        (F.col('new_preferpathway') != F.col('preferpathway')),
        'some_status_change'
    ).otherwise(
        F.col('cdc_status')
    )
)

Upvotes: 1

Related Questions