Reputation: 543
I would like run SparkJarProcessor within Sagemaker Pipeline. After I create an instance of SparkJarProcessor, when I just run
the processor, I can specify the jar and the class you I want to execute with the submit_app
and submit_class
parameters to the run
method. e.g.,
processor.run(
submit_app="my.jar",
submit_class="program.to.run",
arguments=['--my_arg', "my_arg"],
configuration=my_config,
spark_event_logs_s3_uri=log_path
)
If I want to run it as a step in the pipeline, what arguments can I give to ProcessingStep? According to this documentation, you can call get_run_args on the processor to "get the normalized inputs, outputs and arguments needed when using a SparkJarProcessor in a ProcessingStep", but when I run it like this,
processor.get_run_args(
submit_app="my.jar",
submit_class="program.to.run",
arguments=['--my_arg', "my_arg"],
configuration=my_config,
spark_event_logs_s3_uri=log_path
)
My output looks like this:
RunArgs(code='my.jar', inputs=[<sagemaker.processing.ProcessingInput object at 0x7fc53284a090>], outputs=[<sagemaker.processing.ProcessingOutput object at 0x7fc532845ed0>], arguments=['--my_arg', 'my_arg'])
"program.to.run" is not part of the output. So, assuming code
is to specify the jar, what's the normalized version of submit_class
?
Upvotes: 1
Views: 593
Reputation: 116
For more modern sagemaker sdk versions you can use directly the run
method. For example, with '2.120.0' sagemaker sdk version:
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.workflow.pipeline_context import PipelineSession
session = PipelineSession()
spark_processor = PySparkProcessor(
base_job_name="sm-spark",
framework_version="3.1",
role=role,
instance_count=2,
instance_type="ml.m5.xlarge",
max_runtime_in_seconds=1200,
sagemaker_session=session,
)
step_preprocess_data = ProcessingStep(
name="spark-train-data",
step_args=spark_processor.run(
submit_app="./code/preprocess.py",
arguments=[
"--s3_input_bucket",
bucket,
"--s3_input_key_prefix",
"user_filestore/marti/test-spark",
"--s3_output_bucket",
bucket,
"--s3_output_key_prefix",
"user_filestore/marti/test-spark",
],
spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, "user_filestore/marti/test-spark"),
)
)
Upvotes: 0
Reputation: 602
When get_run_args
or run
is called on a SparkJarProcessor, the submit_class
is used to set a property on the processor itself which is why you don't see it in the get_run_args
output.
That processor property will be used during pipeline definition generation to set the ContainerEntrypoint argument to CreateProcessingJob
.
Example:
run_args = spark_processor.get_run_args(
submit_app="my.jar",
submit_class="program.to.run",
arguments=[]
)
step_process = ProcessingStep(
name="SparkJarProcessStep",
processor=spark_processor,
inputs=run_args.inputs,
outputs=run_args.outputs,
code=run_args.code
)
pipeline = Pipeline(
name="myPipeline",
parameters=[],
steps=[step_process],
)
definition = json.loads(pipeline.definition())
definition
The output of definition
:
...
'Steps': [{'Name': 'SparkJarProcessStep',
'Type': 'Processing',
'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
'InstanceCount': 2,
'VolumeSizeInGB': 30}},
'AppSpecification': {'ImageUri': '153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:2.4-cpu',
'ContainerEntrypoint': ['smspark-submit',
'--class',
'program.to.run',
'--local-spark-event-logs-dir',
'/opt/ml/processing/spark-events/',
'/opt/ml/processing/input/code/my.jar']},
...
Upvotes: 1