Adigkar
Adigkar

Reputation: 13

Restarting failed tasks in Databricks workflow

We have a requirment to restart the Azure databricks notebooks running from databricks workflow from the point of failure.I have gone through the repair and rerun concept in databricks and did a small poc to restart the task from point of failure and it works fine

https://www.databricks.com/blog/2022/05/06/save-time-and-money-on-data-and-ml-workflows-with-repair-and-rerun.html

But,in the real scenario,i am running pyspark notebooks which does DML opertaions like update and insert using sqark sql.If any of these fails and we do a repair run,it will run the notebook from beginning.There might be chnances of duplicate inserts in this case.Is there any way i can avoid it.I was thinking about using merge to hadle insert and update based on primary key.Will that resolve the issue of restarting from point of failure using repair and rerun?

I have gone through the repair and rerun concept in databricks and did a small poc to restart the task from point of failure and it works fine

https://www.databricks.com/blog/2022/05/06/save-time-and-money-on-data-and-ml-workflows-with-repair-and-rerun.html

Upvotes: 0

Views: 543

Answers (1)

JayashankarGS
JayashankarGS

Reputation: 8140

You do merge operation using below code.

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

For more information about merging refer this documentation.

Initially itself you start doing upserting upon failue, you do repair and rerun, this only upsert data.

Upvotes: 0

Related Questions