Abhishek
Abhishek

Reputation: 83

case when in merge statement databricks

I am trying to upsert in Databricks using merge statement in pyspark. I wanted to know if using expressions (e.g. adding two columns, case when) allowed in the whenMatchedUpdate part. For example I want to do something like this

deltaTableTarget = DeltaTable.forPath(spark, delta_table_path)

deltaTableTarget.alias('TgtCrmUserAggr') \
  .merge(
    broadcast(df_transformed.alias('DeltaSource')),
    "DeltaSource.primary_key==TargetTable.primary_key"
  ) \
   .whenMatchedUpdate(set =
    {
      "aggcount":"DeltaSource.count + TargetTable.count",
      "max_date": "case when DeltaSource.max_date > TargetTable.max_date then DeltaSource.max_date else TargetTable.max_date end"
    }
  ) 
  .whenNotMatchedInsert().insertAll()
)\
 .execute()

Upvotes: 0

Views: 848

Answers (1)

gamezone25
gamezone25

Reputation: 379

If I understand your logic well, you can just take the max value of the 2 columns, right?

deltaTableTarget = DeltaTable.forPath(spark, delta_table_path)

deltaTableTarget.alias('TgtCrmUserAggr') \
  .merge(
    broadcast(df_transformed.alias('DeltaSource')),
    "DeltaSource.primary_key==TargetTable.primary_key"
  ) \
   .whenMatchedUpdate(set =
    {
      "aggcount":"DeltaSource.count + TargetTable.count",
      "max_date": "GREATEST(DeltaSource.max_date,TargetTable.max_date)"
    }
  ) 
  .whenNotMatchedInsert().insertAll()
)\
 .execute()

If this is not correct, something you could do is use multiple whenMatchedUpdate() functions with a condition.

deltaTableTarget = DeltaTable.forPath(spark, delta_table_path)

deltaTableTarget.alias('TgtCrmUserAggr') \
  .merge(
    broadcast(df_transformed.alias('DeltaSource')),
    "DeltaSource.primary_key==TargetTable.primary_key"
  ) \
  .whenMatchedUpdate(condition= 'DeltaSource.max_date > TargetTable.max_date', 
     set =
         {
           "aggcount":"DeltaSource.count + TargetTable.count",
           "max_date": "DeltaSource.max_date"
          }
  ) 
.whenMatchedUpdate(set =
    {
      "aggcount":"DeltaSource.count + TargetTable.count",
      "max_date": "TargetTable.max_date"
     }
  ) 
  .whenNotMatchedInsert().insertAll()
)\
 .execute()

Upvotes: 1

Related Questions