mskoryk
mskoryk

Reputation: 506

Retrieve Sagemaker Model from Model Registry in Sagemaker Pipelines

I am implementing inference pipeline via AWS Sagemaker Pipelines with Python SDK. I have a Model Package Group in Model Registry and I want to use the latest approved model version from the package group for inference (I am going to use batch-transform inference). However, I don't know which Pipeline step to use to retrieve the latest approved model version. As a workaround, I tried to use from sagemaker.workflow.lambda_step.LambdaStep to retrieve model version ARN and then sagemaker.ModelPackage to define sagemaker.workflow.steps.CreateModelStep . The minimal working code is the following

import sagemaker
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.workflow.pipeline import Pipeline

from sagemaker import ModelPackage
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.inputs import CreateModelInput


role = sagemaker.get_execution_role()
sagemaker_sess = sagemaker.Session()

# create lambda function that retrieves latest approved model version ARN
function_name = f"inference-pipeline-lambda-step"
func = Lambda(
    function_name=function_name,
    execution_role_arn=role,
    script="get_model_arn.py",
    handler="get_model_arn.lambda_handler",
    timeout=600,
    memory_size=10240,
)
output_metric_value = LambdaOutput(output_name="model_package_arn", output_type=LambdaOutputTypeEnum.String)

# define Lambda step that retrieves latest approved model version ARN
step_get_model_arn = LambdaStep(
    name="GetModelARN",
    lambda_func=func,
    inputs={
    },
    outputs=[output_metric_value] 
)

# use output of the previous Lambda step to define a sagemaker Model
model = ModelPackage(
    role=role, 
    model_package_arn=step_get_model_arn.properties.Outputs['model_package_arn'], 
    sagemaker_session=sagemaker_sess
)

# define CreateModelStep so that the model can be later used in Transform step for batch-transform inference
inputs = CreateModelInput(
        instance_type='ml.m5.large',
    )

step_create_model = CreateModelStep(
    name="create-inference-model",
    model=model,
    inputs=inputs,
)

# Pipeline definition and creation/update
pipeline = Pipeline(
    name='well-logs-inference-pipeline',
    parameters=[],
    steps=[
        step_get_model_arn,
        step_create_model
    ],
)

pipeline.upsert(role_arn=role)

This gives an error

TypeError: expected string or bytes-like object

As I understand it, the error happens in model = ModelPackage(...) expression. ModelPackage requires 'model_package_arn' to be a string, however, it is sagemaker.workflow.properties.Properties instead.

Is there a chance to retrieve model version from Model Package Group so that it can be later used in TransformStep?

The full traceback is here

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-6-63bdf0b9bf74> in <module>
     65 )
     66 
---> 67 pipeline.upsert(role_arn=role)

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in upsert(self, role_arn, description, tags, parallelism_config)
    217         """
    218         try:
--> 219             response = self.create(role_arn, description, tags, parallelism_config)
    220         except ClientError as e:
    221             error = e.response["Error"]

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in create(self, role_arn, description, tags, parallelism_config)
    114         """
    115         tags = _append_project_tags(tags)
--> 116         kwargs = self._create_args(role_arn, description, parallelism_config)
    117         update_args(
    118             kwargs,

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in _create_args(self, role_arn, description, parallelism_config)
    136             A keyword argument dict for calling create_pipeline.
    137         """
--> 138         pipeline_definition = self.definition()
    139         kwargs = dict(
    140             PipelineName=self.name,

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in definition(self)
    299     def definition(self) -> str:
    300         """Converts a request structure to string representation for workflow service calls."""
--> 301         request_dict = self.to_request()
    302         request_dict["PipelineExperimentConfig"] = interpolate(
    303             request_dict["PipelineExperimentConfig"], {}, {}

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in to_request(self)
     89             if self.pipeline_experiment_config is not None
     90             else None,
---> 91             "Steps": list_to_request(self.steps),
     92         }
     93 

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/utilities.py in list_to_request(entities)
     40     for entity in entities:
     41         if isinstance(entity, Entity):
---> 42             request_dicts.append(entity.to_request())
     43         elif isinstance(entity, StepCollection):
     44             request_dicts.extend(entity.request_dicts())

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/steps.py in to_request(self)
    212     def to_request(self) -> RequestType:
    213         """Gets the request structure for `ConfigurableRetryStep`."""
--> 214         step_dict = super().to_request()
    215         if self.retry_policies:
    216             step_dict["RetryPolicies"] = self._resolve_retry_policy(self.retry_policies)

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/steps.py in to_request(self)
    101             "Name": self.name,
    102             "Type": self.step_type.value,
--> 103             "Arguments": self.arguments,
    104         }
    105         if self.depends_on:

/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/steps.py in arguments(self)
    411                 container_defs=self.model.prepare_container_def(
    412                     instance_type=self.inputs.instance_type,
--> 413                     accelerator_type=self.inputs.accelerator_type,
    414                 ),
    415                 vpc_config=self.model.vpc_config,

/opt/conda/lib/python3.7/site-packages/sagemaker/model.py in prepare_container_def(self, instance_type, accelerator_type, serverless_inference_config)
    411         """
    412         deploy_key_prefix = fw_utils.model_code_key_prefix(
--> 413             self.key_prefix, self.name, self.image_uri
    414         )
    415         deploy_env = copy.deepcopy(self.env)

/opt/conda/lib/python3.7/site-packages/sagemaker/fw_utils.py in model_code_key_prefix(code_location_key_prefix, model_name, image)
    393         str: the key prefix to be used in uploading code
    394     """
--> 395     training_job_name = sagemaker.utils.name_from_image(image)
    396     return "/".join(filter(None, [code_location_key_prefix, model_name or training_job_name]))
    397 

/opt/conda/lib/python3.7/site-packages/sagemaker/utils.py in name_from_image(image, max_length)
     58         max_length (int): Maximum length for the resulting string (default: 63).
     59     """
---> 60     return name_from_base(base_name_from_image(image), max_length=max_length)
     61 
     62 

/opt/conda/lib/python3.7/site-packages/sagemaker/utils.py in base_name_from_image(image)
    100         str: Algorithm name, as extracted from the image name.
    101     """
--> 102     m = re.match("^(.+/)?([^:/]+)(:[^:]+)?$", image)
    103     algo_name = m.group(2) if m else image
    104     return algo_name

/opt/conda/lib/python3.7/re.py in match(pattern, string, flags)
    173     """Try to apply the pattern at the start of the string, returning
    174     a Match object, or None if no match was found."""
--> 175     return _compile(pattern, flags).match(string)
    176 
    177 def fullmatch(pattern, string, flags=0):

TypeError: expected string or bytes-like object

Upvotes: 1

Views: 2304

Answers (3)

Luk3rson
Luk3rson

Reputation: 171

From the lambda get model_url of you model.tar.gz artifact as well the docker image url and create a new model, this way the object will be created only once and re-used when you execute the pipeline:

from sagemaker.model import Model 
model = Model(
    image_uri=image_uri,
    model_data=model_url,
    role=role,
    sagemaker_session=pipeline_session,
)

create_model_step = ModelStep(
    name="CreateInferenceModelStep",
    step_args=model.create(),
)

from sagemaker.transformer import Transformer
transformer = Transformer(
    model_name=  create_model_step.properties.ModelName,
    instance_type="ml.m5.4xlarge",
    instance_count=3,
    accept="application/jsonlines",
    assemble_with="Line",
    output_path=Join(on='/', values=['s3:/', bucket, base_job_prefix, "batchTransform", "inference", ExecutionVariables.PIPELINE_EXECUTION_ID]),
    sagemaker_session=pipeline_session,
 )

Upvotes: 0

L&#233;vi Bernadine
L&#233;vi Bernadine

Reputation: 11

you can replace your lambda function in order to not only return the model_arn, but as well create a model with the string value of model_arn like this:

def handler(event, context):
    """
    Gets the latest model from the model registry and returns the s3 location of the model data
    as well as the ECR location of the container to be used for inference.
    """

    # Retrieve latest approved model
    model_package_group_name = event['model_package_group_name']
    role = event["role"]
        
    pck = get_approved_package(model_package_group_name)
    try:
        model_description = sm_client.describe_model_package(
            ModelPackageName=pck["ModelPackageArn"]
        )
    except ClientError as e:
        error_msg = f"describe_model_package failed: {e.response['Error']['Code']}, {e.response['Error']['Message']}"
        raise Exception(error_msg)

    model_url = model_description["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]
    image_uri = model_description["InferenceSpecification"]["Containers"][0]["Image"]
    model_version = pck["ModelPackageVersion"]
    model_arn = pck["ModelPackageArn"]
    model_env = model_description["InferenceSpecification"]["Containers"][0]["Environment"]
    model_name = model_package_group_name + "-v" + str(model_version)
    
    container = {"ModelPackageName": model_arn}
    
    create_model_respose = sm_client.create_model(ModelName=model_name, ExecutionRoleArn=role, Containers=[container] )
    
    
    return {
        "statusCode": 200,
        "ModelName": model_name
    }

then get the model Name and pass it to the Transformer (which can be a PipelineVariable)

transformer = Transformer(
    model_name=step_latest_model_fetch.properties.Outputs["ModelName"],
    instance_type="ml.m5.xlarge",
    strategy='MultiRecord',
    max_payload=18, 
    instance_count=nb_worker,
    output_path=s3_transformer_output_path,
    assemble_with="Line", 
    accept="text/csv",
    max_concurrent_transforms=nb_worker
)

#then you have your step
TransformStep (transformer = transformer, ..)

Upvotes: 0

Juan Sanguineti
Juan Sanguineti

Reputation: 1

the class ModelPackage only receives a String as an ARN. You have to create a new model base on the model registry and then use the class model to perform a transformation

Upvotes: 0

Related Questions