Reputation: 1082
I have packaged my training code as a python package and then am able to run it as a custom training job on Vertex AI. Now, I wanted to be able to schedule this job to run, say every 2 weeks, and re-train the model. The Scheduling settings in the CustomJoBSpec allow only 2 fields, "timeout" and "restartJobOnWorkerRestart" so it's not possible using the scheduling settings in the CustomJobSpec. One way to achieve this I could think of was to create a Vertex AI pipeline with a single step using the "CustomPythonPackageTrainingJobRunOp" Google Cloud Pipeline Component and then scheduling the pipeline to run as I see fit. Are there better alternatives to achieve this?
Edit:
I was able to schedule the custom training job using Cloud Scheduler, but I found using the create_schedule_from_job_spec method in the AIPlatformClient very easy to use in the Vertex AI pipeline. The steps I took to schedule the custom job using Cloud Scheduler in gcp are as follows, link to google docs:
You also need to have a "Cloud Scheduler service account" with a "Cloud Scheduler Service Agent role granted to it" in your project. Although the docs ay this should have been set up automatically if you enabled the Cloud Scheduler API after March 19, 2019, this was not the case for me and had to add the service account with the role manually.
Upvotes: 4
Views: 5958
Reputation: 1552
As per your requirement, the various possible ways for scheduling :
1. Cloud Composer
Cloud Composer is a managed Apache Airflow that helps you create, schedule, monitor and manage workflows.
You can follow the below mentioned steps to schedule your job every two weeks using Composer :
with models.DAG(
'composer_sample_bq_notify',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:
Alternately, you can also use the unix-cron string format (* * * * *) to do the scheduling.
I.e. In your case for scheduling every two weeks the cron format will be like : * * 1,15 * *
You can pass the parameters required by the custom job inside the PythonOperator using op_args and op_kwargs arguments.
After the DAG file is written, you need to push it to the dags/ folder inside the Composer Environment Bucket.
You can check the status of the scheduled DAG in the Airflow UI.
The scheduled DAG file would look like this:
sample_dag.py :
from __future__ import print_function
import datetime
from google.cloud import aiplatform
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': YESTERDAY,
}
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(weeks=2),
default_args=default_dag_args) as dag:
def create_custom_job_sample(
project: str,
display_name: str,
container_image_uri: str,
location: str,
api_endpoint: str,
):
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(client_options=client_options)
custom_job = {
"display_name": display_name,
"job_spec": {
"worker_pool_specs": [
{
"machine_spec": {
"machine_type": "n1-standard-4",
"accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
"accelerator_count": 1,
},
"replica_count": 1,
"container_spec": {
"image_uri": container_image_uri,
"command": [],
"args": [],
},
}
]
},
}
parent = f"projects/{project}/locations/{location}"
response = client.create_custom_job(parent=parent, custom_job=custom_job)
print("response:", response)
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=create_custom_job_sample,
op_kwargs={"project" : "your_project","display_name" : "name","container_image_uri":"uri path","location": "us-central1","api_endpoint":"us-central1-aiplatform.googleapis.com"}
)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='job scheduled')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
2. Cloud Scheduler: To schedule a job using Cloud Scheduler you will need to do the following configuration:
3. Scheduling a recurring pipeline run using the Kubeflow Pipelines SDK:
You can schedule a recurring pipeline run using Python and the Kubeflow Pipelines SDK.
from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID,
region=REGION)
api_client.create_schedule_from_job_spec(
job_spec_path=COMPILED_PIPELINE_PATH,
schedule=* * 1,15 * *,
time_zone=TIME_ZONE,
parameter_values=PIPELINE_PARAMETERS
)
Upvotes: 4