Reputation: 11
Azure ML SDKv2: Scheduled Pipeline Job Not Executing Main Script Properly
Problem:
I'm working on migrating an Azure ML project from SDKv1 to SDKv2. While my scheduled pipeline job runs successfully, the output of my main script (main.py
) remains identical to the initial job submission, indicating the script isn't being re-executed.
Code:
main.py
: Generates a random string on each execution and prints it along with the current timestamp.Publish.py
:
main.py
.Expected Behavior:
Each scheduled job execution should generate a new random string and print it along with the current timestamp.
Actual Behavior:
All scheduled job executions produce the same random string and timestamp as the initial job submission.
Code Snippets:
main.py
:import string
import random
import datetime
# using random.choices() generating random strings
res = ''.join(random.choices(string.ascii_letters, k=10)) # initializing size of string
message_to_main = f"Hello world from scheduler... {str(res)}"
print(message_to_main)
print(f"Script execution time: {datetime.datetime.utcnow()}")
Publish.py
:# ... (relevant code for environment setup, component registration, pipeline definition, job submission, and scheduling)
# Import required libraries
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
from azure.ai.ml import command
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.constants import TimeZone
from azure.ai.ml.entities import (
Environment, BuildContext,
JobSchedule,
RecurrenceTrigger,
RecurrencePattern,
)
from azure.core.exceptions import ResourceNotFoundError
from azure.ai.ml.exceptions import ValidationException
def main():
"""
main function performs following actions:
- Initialize MLClient handle
- Creates or update environment for the main component which executes the logic from
Dockerfile and conda.yml
- Creates or update component to execute the main.py script (script is simple and does not
have any input and outputs)
- Setup pipeline using main component.
- Submit pipeline for execution as job
- Schedule pipeline as recurring job every 5 minutes
"""
# Set variables
COMPUTE = "sdkv2-test-cluster"
ENV_PATH = "."
ENV_NAME = "helloapp_sdkv2_env_02"
MAIN_COMPONENT_NAME = "helloapp_sdkv2_main_02"
COMPONENT_CODE_PATH = "src"
APP_PIPELINE_NAME = "helloapp_sdkv2_scheduler_pipeline_02"
PIPELINE_JOB_EXPERIMENT = "helloapp_sdkv2_scheduler_experiment_02"
JOB_SCHEDULE_NAME = "helloapp_sdkv2_scheduler_experiment_02"
# get a handle to the workspace
subscription_id="<SUBSCRIPTION ID>"
resource_group="<RESOURCE GROUP NAME>"
workspace="<AML WORKSPACE NAME>"
ml_client = MLClient(
DefaultAzureCredential(), subscription_id, resource_group, workspace
)
try:
print(f'Starting to create environment: {ENV_NAME}')
env_docker_context = Environment(
build=BuildContext(path=ENV_PATH),
name=ENV_NAME,
description=f"Environment for {MAIN_COMPONENT_NAME}"
)
job_run_env = ml_client.environments.create_or_update(env_docker_context)
print(f'Environment creation job started for: {ENV_NAME}')
except Exception as e:
print(e)
raise e
try:
component = command(
name=MAIN_COMPONENT_NAME,
compute=COMPUTE,
# The source folder of the component
code=COMPONENT_CODE_PATH,
command="python main.py",
environment=job_run_env,
)
# Register component for reusability
registered_component = ml_client.create_or_update(component.component)
print(
f"Component {registered_component.name} with Version {registered_component.version} is registered"
)
except Exception as e:
print(e)
raise e
# Create simple pipeline
@pipeline(name=APP_PIPELINE_NAME, compute=COMPUTE)
def hello_sdkv2_scheduler_pipeline():
_ = registered_component()
app_pipeline = hello_sdkv2_scheduler_pipeline()
# submit pipeline as job for execution
submitted_job = ml_client.jobs.create_or_update(
app_pipeline, experiment_name=PIPELINE_JOB_EXPERIMENT
)
print(submitted_job.id)
# Schedule pipeline job for recurring execution every 5 mins
recurrence_trigger = RecurrenceTrigger(
frequency="minute",
interval=5,
time_zone=TimeZone.CENTRAL_AMERICA_STANDARD_TIME,
)
job_schedule = JobSchedule(
name=JOB_SCHEDULE_NAME,
trigger=recurrence_trigger,
create_job=app_pipeline
)
job_schedule = ml_client.schedules.begin_create_or_update(
schedule=job_schedule
).result()
print(job_schedule)
if __name__ == "__main__":
main()
Dockerfile
:FROM mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu22.04
ENV AZUREML_CONDA_ENVIRONMENT_PATH /azureml-envs/helloappsdkv2
COPY conda.yml .
# Create the conda environment from the YAML file
RUN conda env create -f conda.yml -p $AZUREML_CONDA_ENVIRONMENT_PATH
# Prepend path to AzureML conda environment
ENV PATH $AZUREML_CONDA_ENVIRONMENT_PATH/bin:$PATH
RUN rm conda.yml
# This is needed for mpi to locate libpython
ENV LD_LIBRARY_PATH $AZUREML_CONDA_ENVIRONMENT_PATH/lib:$LD_LIBRARY_PATH
conda.yml
:channels:
- defaults
dependencies:
- python=3.12.7
- pip
- pip:
- azure-ai-ml>=1.15.0
Question:
Why is the scheduled pipeline job not re-executing main.py
and producing the same output as the initial job submission? What steps can I take to ensure each scheduled job execution runs main.py
independently and generates a new random string?
Additional Information:
main.py
for each execution.Outputs from Initial job and Scheduled executions
Main Script Output: Hello world from scheduler... VUQcztoumM Script execution time: 2024-10-22 14:25:55.560369
Main Script Output: Hello world from scheduler... VUQcztoumM Script execution time: 2024-10-22 14:25:55.560369
Main Script Output: Hello world from scheduler... VUQcztoumM Script execution time: 2024-10-22 14:25:55.560369
Main Script Output: Hello world from scheduler... VUQcztoumM Script execution time: 2024-10-22 14:25:55.560369
Please help me understand why the scheduler is not executing main.py
correctly and how to fix it.
If it is caching the result, what should be workaround for the pipelines which does not have any Inputs and Outputs.
Upvotes: 1
Views: 118
Reputation: 11
Solved: There is setting in pipeline defination *force_rerun which is by default False. Changing that setting to True solved the issue of rerun.
app_pipeline.settings.force_rerun = True
force_rerun (boolean): Whether to force rerun the whole pipeline. The default value is False. This means that by default, the pipeline tries to reuse the output of the previous job if it meets reuse criteria. If set as True, all steps in the pipeline will rerun.
Upvotes: 0