irum zahra
irum zahra

Reputation: 455

Assign different cluster to existing jobs on azure databricks before running it

I am running auto jobs for training models using databricks-api on Azure Databricks clusters. My script checks cluster, if it doesn't exist script would create a new one else it would return the id of the existing one. after that, my script checks for the jobs by name, if job doesn't exist it would create a new one else if job exists it returns the id of the existing job, attach a cluster to it and then run it, after job completion my script deletes all the clusters... Issue: for the first run it works fine, but after that when I run the script and create new clusters to attach to the jobs that i already created it gives me error that the cluster doesn't exist because jobs keep looking for the old clusters. is there any way to update/assign a new cluster for already existing job before running it?

def authenticateDBWs():
  from databricks_api import DatabricksAPI
  db = DatabricksAPI(
      host="https:",
      token=""
  )
  return db

def createCluster(db, clusterName):
  print("Cluster not found... Now trying to create new cluster: ", clusterName)
  cluster = db.cluster.create_cluster(
    num_workers=0,
    cluster_name=clusterName,
    spark_version='10.1.x-gpu-ml-scala2.12',
    spark_conf= {
        "spark.databricks.cluster.profile": "singleNode",
        "spark.master": "local[*]"
    },
    node_type_id="Standard_NC4as_T4_v3",
    driver_node_type_id="Standard_NC4as_T4_v3",
    autotermination_minutes=20,
    enable_elastic_disk=True,
    )
  clusters = db.cluster.list_clusters(headers=None)
  cluster = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
  return cluster
  
def getCluster(db, clusterName):
  print("Trying to get cluster: ", clusterName)
  clusters = db.cluster.list_clusters(headers=None)
  clusterArray = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
  if len(clusterArray) == 0:
    clusterArray = createCluster(db, clusterName)
    cluster =   clusterArray[0]
  else:
    cluster =   clusterArray[0]
  
  return cluster


def startCluster (db, clusterName):
  print("Trying to start cluster: ", clusterName)
  import time
  cluster = getCluster(db,clusterName)
  #CHECK IF CLUSTER OBJECT IS EMPTY THEN CREATE CLUSTER
  
  print("Cluster state ...", cluster["state"])
  if cluster["state"] == 'TERMINATED':
    db.cluster.start_cluster(cluster_id  = cluster["cluster_id"])
  while cluster["state"] == 'PENDING':
    time.sleep(30)
    cluster = getCluster(db,clusterName)
    
  if cluster["state"] == 'RUNNING':
    status = cluster["state"]
  else:
    status = 'ERROR'
  return status
  

def getJob(db , jobName , clusterId, modelClass, classInstance):
  print("Get Job: ", jobName)
  jobs = db.jobs.list_jobs(headers=None)
  job = [d for d in jobs["jobs"] if d["settings"]["name"] in jobName]
  if len(job) == 0:
    print("Job does not exit, going to create it...")
    job = createJob(db, jobName, clusterId, modelClass, classInstance )
    jobId = job["job_id"]
  else:
    print("Job already existis")
    jobId = job[0]["job_id"]
  return jobId
  

def createJob(db, jobName , clusterId, modelClass, classInstance):
  print("Creating Job: "+jobName+" on "+clusterId)
  trainModelPath = '/Shared/Databricks-Mlops/Covid-Classifer/train/trainModel'
  job = db.jobs.create_job(
    name=jobName,
    existing_cluster_id=clusterId,
    email_notifications=None,
    notebook_task = {"notebook_path": trainModelPath,
                     "base_parameters": { "modelClass":  modelClass,"classInstance": classInstance }} ,
    timeout_seconds=54000)
  return job


def runJob(db, jobId, modelClass, classInstance):
  runId = db.jobs.run_now(
    job_id=jobId,
    notebook_params= {"modelClass":  modelClass,"classInstance": classInstance },
  )
  return runId 


def getRunResult(db, runId):
  import time
  run = db.jobs.get_run(runId)
  runState = run["state"]["life_cycle_state"]
  while runState == 'RUNNING' or runState == 'PENDING':
    print("Training run in progress.. status: ", runState)
    time.sleep(30)
    run = db.jobs.get_run(runId)
    runState = run["state"]["life_cycle_state"]
  

  runOutput  = db.jobs.get_run_output(runId)
  print('#########runOutput######## ',runOutput)
  #print("Run output:", runOutput["metadata"])
  return runOutput
    
#def runTrainingJob(modelClass, classInstance):
def runTrainingJob(arguments):
  
  modelClass=arguments[0]
  classInstance=arguments[1]
  
  db = authenticateDBWs()
  #clusterName = 'auto_train_covid_clas' + str(modelClass) + '_ins' + str(classInstance)
  clusterName = 'train_covid_clas'+str(modelClass)+ '_ins' + str(classInstance)
  print('REQUIRED CLUSTER NAME: ',clusterName)
  jobName = 'Covid_Class_autojob_' + str(modelClass) + '_Model_V' + str(classInstance)
  print('REQUIRED JOB NAME: ', jobName)
  
  cluster = getCluster(db, clusterName)
  print('1. getCluster: ',cluster)
  clusterStatus = startCluster(db, clusterName)
  print("2. Cluster status:", clusterStatus)
  
  print("run the training jobs")
  jobId = getJob(db, jobName, cluster["cluster_id"], modelClass, classInstance)
  runId = runJob(db , jobId, modelClass, classInstance)
  runResult = getRunResult(db, runId["run_id"])

  return runResult

return runId

Upvotes: 1

Views: 1669

Answers (1)

Alex Ott
Alex Ott

Reputation: 87069

Yes, it's possible to do - you need to update corresponding setting in the job definition using either full update or partial update APIs. You just need to add corresponding step in your code to make an update.

But the main question is - why do you need to run job on the existing cluster? Jobs are meant to be run completely automatically, and it's much cheaper (almost 4x) to run job on a job cluster (created automatically) than run on interactive clusters. Consider switching to that method because it will remove your original problem completely as job will have cluster definition attached to it.

P.S. Another option is to use Databricks Terraform Provider that will create all objects & handle all dependencies automatically.

Upvotes: 1

Related Questions