Reputation: 83
I am trying to upsert in Databricks using merge statement in pyspark. I wanted to know if using expressions (e.g. adding two columns, case when) allowed in the whenMatchedUpdate part. For example I want to do something like this
deltaTableTarget = DeltaTable.forPath(spark, delta_table_path)
deltaTableTarget.alias('TgtCrmUserAggr') \
.merge(
broadcast(df_transformed.alias('DeltaSource')),
"DeltaSource.primary_key==TargetTable.primary_key"
) \
.whenMatchedUpdate(set =
{
"aggcount":"DeltaSource.count + TargetTable.count",
"max_date": "case when DeltaSource.max_date > TargetTable.max_date then DeltaSource.max_date else TargetTable.max_date end"
}
)
.whenNotMatchedInsert().insertAll()
)\
.execute()
Upvotes: 0
Views: 848
Reputation: 379
If I understand your logic well, you can just take the max value of the 2 columns, right?
deltaTableTarget = DeltaTable.forPath(spark, delta_table_path)
deltaTableTarget.alias('TgtCrmUserAggr') \
.merge(
broadcast(df_transformed.alias('DeltaSource')),
"DeltaSource.primary_key==TargetTable.primary_key"
) \
.whenMatchedUpdate(set =
{
"aggcount":"DeltaSource.count + TargetTable.count",
"max_date": "GREATEST(DeltaSource.max_date,TargetTable.max_date)"
}
)
.whenNotMatchedInsert().insertAll()
)\
.execute()
If this is not correct, something you could do is use multiple whenMatchedUpdate() functions with a condition.
deltaTableTarget = DeltaTable.forPath(spark, delta_table_path)
deltaTableTarget.alias('TgtCrmUserAggr') \
.merge(
broadcast(df_transformed.alias('DeltaSource')),
"DeltaSource.primary_key==TargetTable.primary_key"
) \
.whenMatchedUpdate(condition= 'DeltaSource.max_date > TargetTable.max_date',
set =
{
"aggcount":"DeltaSource.count + TargetTable.count",
"max_date": "DeltaSource.max_date"
}
)
.whenMatchedUpdate(set =
{
"aggcount":"DeltaSource.count + TargetTable.count",
"max_date": "TargetTable.max_date"
}
)
.whenNotMatchedInsert().insertAll()
)\
.execute()
Upvotes: 1