cs0815
cs0815

Reputation: 17388

start, monitor and define script of SageMaker processing job from local machine

I am looking at this, which makes all sense. Let us focus on this bit of code:

from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(
    code="preprocessing.py",
    inputs=[
        ProcessingInput(source="s3://your-bucket/path/to/your/data", destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train_data", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/test"),
    ],
    arguments=["--train-test-split-ratio", "0.2"],
)

preprocessing_job_description = sklearn_processor.jobs[-1].describe() 

Here preprocessing.py has to be obviously in the cloud. I am curious, could one also put scripts into an S3 bucket and trigger the job remotely. I can easily to this with hyper parameter optimisation, which does not require dedicated scripts though as I use an OOTB training image.

In this case I can fire off the job like so:

tuning_job_name = "amazing-hpo-job-" + strftime("%d-%H-%M-%S", gmtime())

smclient = boto3.Session().client("sagemaker")
smclient.create_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=tuning_job_name,
    HyperParameterTuningJobConfig=tuning_job_config,
    TrainingJobDefinition=training_job_definition
)

and then monitor the job's progress:

smclient = boto3.Session().client("sagemaker")

tuning_job_result = smclient.describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=tuning_job_name
)

status = tuning_job_result["HyperParameterTuningJobStatus"]
if status != "Completed":
    print("Reminder: the tuning job has not been completed.")

job_count = tuning_job_result["TrainingJobStatusCounters"]["Completed"]
print("%d training jobs have completed" % job_count)

objective = tuning_job_result["HyperParameterTuningJobConfig"]["HyperParameterTuningJobObjective"]
is_minimize = objective["Type"] != "Maximize"
objective_name = objective["MetricName"]

if tuning_job_result.get("BestTrainingJob", None):
    print("Best model found so far:")
    pprint(tuning_job_result["BestTrainingJob"])
else:
    print("No training jobs have reported results yet.") 

I would think starting and monitoring a SageMaker processing job from a local machine should be possible as with an HPO job but what about the script(s)? Ideally I would like to develop and test them locally and the run remotely. Hope this makes sense?

Upvotes: 0

Views: 1276

Answers (1)

Marc Karp
Marc Karp

Reputation: 1314

Im not sure I understand the comparison to a Tuning Job.

Based on what you have described, in this case the preprocessing.py is actually stored locally. The SageMaker SDK will upload it to S3 for the remote Processing Job to access it. I suggest launching the Job and then taking a look at the inputs in the SageMaker Console.

If you wanted to test the Processing Job locally you can do so using Local Mode. This will basically imitate the Job locally which aids in debugging the script before kicking off a remote Processing Job. Kindly note docker is required to make use of Local Mode.

Example code for local mode:

from sagemaker.local import LocalSession
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput

sagemaker_session = LocalSession()
sagemaker_session.config = {'local': {'local_code': True}}

# For local training a dummy role will be sufficient
role = 'arn:aws:iam::111111111111:role/service-role/AmazonSageMaker-ExecutionRole-20200101T000001'

processor = ScriptProcessor(command=['python3'],
                    image_uri='sagemaker-scikit-learn-processing-local',
                    role=role,
                    instance_count=1,
                    instance_type='local')

processor.run(code='processing_script.py',
                    inputs=[ProcessingInput(
                        source='./input_data/',
                        destination='/opt/ml/processing/input_data/')],
                    outputs=[ProcessingOutput(
                        output_name='word_count_data',
                        source='/opt/ml/processing/processed_data/')],
                    arguments=['job-type', 'word-count']
                    )

preprocessing_job_description = processor.jobs[-1].describe()
output_config = preprocessing_job_description['ProcessingOutputConfig']

print(output_config)

for output in output_config['Outputs']:
    if output['OutputName'] == 'word_count_data':
        word_count_data_file = output['S3Output']['S3Uri']

print('Output file is located on: {}'.format(word_count_data_file))


Upvotes: 1

Related Questions