Axol Otl
Axol Otl

Reputation: 543

SparkJarProcessor in Sagemaker Pipeline

I would like run SparkJarProcessor within Sagemaker Pipeline. After I create an instance of SparkJarProcessor, when I just run the processor, I can specify the jar and the class you I want to execute with the submit_app and submit_class parameters to the run method. e.g.,

processor.run(
    submit_app="my.jar",
    submit_class="program.to.run",
    arguments=['--my_arg', "my_arg"],
    configuration=my_config,
    spark_event_logs_s3_uri=log_path
)

If I want to run it as a step in the pipeline, what arguments can I give to ProcessingStep? According to this documentation, you can call get_run_args on the processor to "get the normalized inputs, outputs and arguments needed when using a SparkJarProcessor in a ProcessingStep", but when I run it like this,

processor.get_run_args(
    submit_app="my.jar", 
    submit_class="program.to.run",
    arguments=['--my_arg', "my_arg"],
    configuration=my_config,
    spark_event_logs_s3_uri=log_path
)

My output looks like this:

RunArgs(code='my.jar', inputs=[<sagemaker.processing.ProcessingInput object at 0x7fc53284a090>], outputs=[<sagemaker.processing.ProcessingOutput object at 0x7fc532845ed0>], arguments=['--my_arg', 'my_arg'])

"program.to.run" is not part of the output. So, assuming code is to specify the jar, what's the normalized version of submit_class?

Upvotes: 1

Views: 593

Answers (2)

For more modern sagemaker sdk versions you can use directly the run method. For example, with '2.120.0' sagemaker sdk version:

from sagemaker.workflow.steps import ProcessingStep
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.workflow.pipeline_context import PipelineSession


session = PipelineSession()

spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="3.1",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    sagemaker_session=session,
)

step_preprocess_data = ProcessingStep(
    name="spark-train-data",
    step_args=spark_processor.run(
        submit_app="./code/preprocess.py",
        arguments=[
            "--s3_input_bucket",
            bucket,
            "--s3_input_key_prefix",
            "user_filestore/marti/test-spark",
            "--s3_output_bucket",
            bucket,
            "--s3_output_key_prefix",
            "user_filestore/marti/test-spark",
        ],
        spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, "user_filestore/marti/test-spark"),
    )
)

Upvotes: 0

Payton Staub
Payton Staub

Reputation: 602

When get_run_args or run is called on a SparkJarProcessor, the submit_class is used to set a property on the processor itself which is why you don't see it in the get_run_args output.

That processor property will be used during pipeline definition generation to set the ContainerEntrypoint argument to CreateProcessingJob.

Example:

run_args = spark_processor.get_run_args(
    submit_app="my.jar",
    submit_class="program.to.run",
    arguments=[]
)

step_process = ProcessingStep(
    name="SparkJarProcessStep",
    processor=spark_processor,
    inputs=run_args.inputs,
    outputs=run_args.outputs,
    code=run_args.code
)

pipeline = Pipeline(
    name="myPipeline",
    parameters=[],
    steps=[step_process],
)

definition = json.loads(pipeline.definition())
definition

The output of definition:

...
'Steps': [{'Name': 'SparkJarProcessStep',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': 2,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '153931337802.dkr.ecr.us-west-2.amazonaws.com/sagemaker-spark-processing:2.4-cpu',
     'ContainerEntrypoint': ['smspark-submit',
      '--class',
      'program.to.run',
      '--local-spark-event-logs-dir',
      '/opt/ml/processing/spark-events/',
      '/opt/ml/processing/input/code/my.jar']},
...

Upvotes: 1

Related Questions