Reputation: 455
I am running auto jobs for training models using databricks-api on Azure Databricks clusters. My script checks cluster, if it doesn't exist script would create a new one else it would return the id of the existing one. after that, my script checks for the jobs by name, if job doesn't exist it would create a new one else if job exists it returns the id of the existing job, attach a cluster to it and then run it, after job completion my script deletes all the clusters... Issue: for the first run it works fine, but after that when I run the script and create new clusters to attach to the jobs that i already created it gives me error that the cluster doesn't exist because jobs keep looking for the old clusters. is there any way to update/assign a new cluster for already existing job before running it?
def authenticateDBWs():
from databricks_api import DatabricksAPI
db = DatabricksAPI(
host="https:",
token=""
)
return db
def createCluster(db, clusterName):
print("Cluster not found... Now trying to create new cluster: ", clusterName)
cluster = db.cluster.create_cluster(
num_workers=0,
cluster_name=clusterName,
spark_version='10.1.x-gpu-ml-scala2.12',
spark_conf= {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]"
},
node_type_id="Standard_NC4as_T4_v3",
driver_node_type_id="Standard_NC4as_T4_v3",
autotermination_minutes=20,
enable_elastic_disk=True,
)
clusters = db.cluster.list_clusters(headers=None)
cluster = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
return cluster
def getCluster(db, clusterName):
print("Trying to get cluster: ", clusterName)
clusters = db.cluster.list_clusters(headers=None)
clusterArray = [d for d in clusters["clusters"] if d['cluster_name'] in clusterName]
if len(clusterArray) == 0:
clusterArray = createCluster(db, clusterName)
cluster = clusterArray[0]
else:
cluster = clusterArray[0]
return cluster
def startCluster (db, clusterName):
print("Trying to start cluster: ", clusterName)
import time
cluster = getCluster(db,clusterName)
#CHECK IF CLUSTER OBJECT IS EMPTY THEN CREATE CLUSTER
print("Cluster state ...", cluster["state"])
if cluster["state"] == 'TERMINATED':
db.cluster.start_cluster(cluster_id = cluster["cluster_id"])
while cluster["state"] == 'PENDING':
time.sleep(30)
cluster = getCluster(db,clusterName)
if cluster["state"] == 'RUNNING':
status = cluster["state"]
else:
status = 'ERROR'
return status
def getJob(db , jobName , clusterId, modelClass, classInstance):
print("Get Job: ", jobName)
jobs = db.jobs.list_jobs(headers=None)
job = [d for d in jobs["jobs"] if d["settings"]["name"] in jobName]
if len(job) == 0:
print("Job does not exit, going to create it...")
job = createJob(db, jobName, clusterId, modelClass, classInstance )
jobId = job["job_id"]
else:
print("Job already existis")
jobId = job[0]["job_id"]
return jobId
def createJob(db, jobName , clusterId, modelClass, classInstance):
print("Creating Job: "+jobName+" on "+clusterId)
trainModelPath = '/Shared/Databricks-Mlops/Covid-Classifer/train/trainModel'
job = db.jobs.create_job(
name=jobName,
existing_cluster_id=clusterId,
email_notifications=None,
notebook_task = {"notebook_path": trainModelPath,
"base_parameters": { "modelClass": modelClass,"classInstance": classInstance }} ,
timeout_seconds=54000)
return job
def runJob(db, jobId, modelClass, classInstance):
runId = db.jobs.run_now(
job_id=jobId,
notebook_params= {"modelClass": modelClass,"classInstance": classInstance },
)
return runId
def getRunResult(db, runId):
import time
run = db.jobs.get_run(runId)
runState = run["state"]["life_cycle_state"]
while runState == 'RUNNING' or runState == 'PENDING':
print("Training run in progress.. status: ", runState)
time.sleep(30)
run = db.jobs.get_run(runId)
runState = run["state"]["life_cycle_state"]
runOutput = db.jobs.get_run_output(runId)
print('#########runOutput######## ',runOutput)
#print("Run output:", runOutput["metadata"])
return runOutput
#def runTrainingJob(modelClass, classInstance):
def runTrainingJob(arguments):
modelClass=arguments[0]
classInstance=arguments[1]
db = authenticateDBWs()
#clusterName = 'auto_train_covid_clas' + str(modelClass) + '_ins' + str(classInstance)
clusterName = 'train_covid_clas'+str(modelClass)+ '_ins' + str(classInstance)
print('REQUIRED CLUSTER NAME: ',clusterName)
jobName = 'Covid_Class_autojob_' + str(modelClass) + '_Model_V' + str(classInstance)
print('REQUIRED JOB NAME: ', jobName)
cluster = getCluster(db, clusterName)
print('1. getCluster: ',cluster)
clusterStatus = startCluster(db, clusterName)
print("2. Cluster status:", clusterStatus)
print("run the training jobs")
jobId = getJob(db, jobName, cluster["cluster_id"], modelClass, classInstance)
runId = runJob(db , jobId, modelClass, classInstance)
runResult = getRunResult(db, runId["run_id"])
return runResult
return runId
Upvotes: 1
Views: 1669
Reputation: 87069
Yes, it's possible to do - you need to update corresponding setting in the job definition using either full update or partial update APIs. You just need to add corresponding step in your code to make an update.
But the main question is - why do you need to run job on the existing cluster? Jobs are meant to be run completely automatically, and it's much cheaper (almost 4x) to run job on a job cluster (created automatically) than run on interactive clusters. Consider switching to that method because it will remove your original problem completely as job will have cluster definition attached to it.
P.S. Another option is to use Databricks Terraform Provider that will create all objects & handle all dependencies automatically.
Upvotes: 1