Reputation: 13
We have a requirment to restart the Azure databricks notebooks running from databricks workflow from the point of failure.I have gone through the repair and rerun concept in databricks and did a small poc to restart the task from point of failure and it works fine
But,in the real scenario,i am running pyspark notebooks which does DML opertaions like update and insert using sqark sql.If any of these fails and we do a repair run,it will run the notebook from beginning.There might be chnances of duplicate inserts in this case.Is there any way i can avoid it.I was thinking about using merge to hadle insert and update based on primary key.Will that resolve the issue of restarting from point of failure using repair and rerun?
I have gone through the repair and rerun concept in databricks and did a small poc to restart the task from point of failure and it works fine
Upvotes: 0
Views: 543
Reputation: 8140
You do merge operation using below code.
from delta.tables import *
deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
For more information about merging refer this documentation.
Initially itself you start doing upserting upon failue, you do repair and rerun, this only upsert data.
Upvotes: 0