Reputation: 506
I am implementing inference pipeline via AWS Sagemaker Pipelines with Python SDK. I have a Model Package Group in Model Registry and I want to use the latest approved model version from the package group for inference (I am going to use batch-transform inference). However, I don't know which Pipeline step to use to retrieve the latest approved model version. As a workaround, I tried to use from sagemaker.workflow.lambda_step.LambdaStep
to retrieve model version ARN and then sagemaker.ModelPackage
to define sagemaker.workflow.steps.CreateModelStep
. The minimal working code is the following
import sagemaker
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker import ModelPackage
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.inputs import CreateModelInput
role = sagemaker.get_execution_role()
sagemaker_sess = sagemaker.Session()
# create lambda function that retrieves latest approved model version ARN
function_name = f"inference-pipeline-lambda-step"
func = Lambda(
function_name=function_name,
execution_role_arn=role,
script="get_model_arn.py",
handler="get_model_arn.lambda_handler",
timeout=600,
memory_size=10240,
)
output_metric_value = LambdaOutput(output_name="model_package_arn", output_type=LambdaOutputTypeEnum.String)
# define Lambda step that retrieves latest approved model version ARN
step_get_model_arn = LambdaStep(
name="GetModelARN",
lambda_func=func,
inputs={
},
outputs=[output_metric_value]
)
# use output of the previous Lambda step to define a sagemaker Model
model = ModelPackage(
role=role,
model_package_arn=step_get_model_arn.properties.Outputs['model_package_arn'],
sagemaker_session=sagemaker_sess
)
# define CreateModelStep so that the model can be later used in Transform step for batch-transform inference
inputs = CreateModelInput(
instance_type='ml.m5.large',
)
step_create_model = CreateModelStep(
name="create-inference-model",
model=model,
inputs=inputs,
)
# Pipeline definition and creation/update
pipeline = Pipeline(
name='well-logs-inference-pipeline',
parameters=[],
steps=[
step_get_model_arn,
step_create_model
],
)
pipeline.upsert(role_arn=role)
This gives an error
TypeError: expected string or bytes-like object
As I understand it, the error happens in model = ModelPackage(...)
expression. ModelPackage requires 'model_package_arn' to be a string, however, it is sagemaker.workflow.properties.Properties
instead.
Is there a chance to retrieve model version from Model Package Group so that it can be later used in TransformStep?
The full traceback is here
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-6-63bdf0b9bf74> in <module>
65 )
66
---> 67 pipeline.upsert(role_arn=role)
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in upsert(self, role_arn, description, tags, parallelism_config)
217 """
218 try:
--> 219 response = self.create(role_arn, description, tags, parallelism_config)
220 except ClientError as e:
221 error = e.response["Error"]
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in create(self, role_arn, description, tags, parallelism_config)
114 """
115 tags = _append_project_tags(tags)
--> 116 kwargs = self._create_args(role_arn, description, parallelism_config)
117 update_args(
118 kwargs,
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in _create_args(self, role_arn, description, parallelism_config)
136 A keyword argument dict for calling create_pipeline.
137 """
--> 138 pipeline_definition = self.definition()
139 kwargs = dict(
140 PipelineName=self.name,
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in definition(self)
299 def definition(self) -> str:
300 """Converts a request structure to string representation for workflow service calls."""
--> 301 request_dict = self.to_request()
302 request_dict["PipelineExperimentConfig"] = interpolate(
303 request_dict["PipelineExperimentConfig"], {}, {}
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/pipeline.py in to_request(self)
89 if self.pipeline_experiment_config is not None
90 else None,
---> 91 "Steps": list_to_request(self.steps),
92 }
93
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/utilities.py in list_to_request(entities)
40 for entity in entities:
41 if isinstance(entity, Entity):
---> 42 request_dicts.append(entity.to_request())
43 elif isinstance(entity, StepCollection):
44 request_dicts.extend(entity.request_dicts())
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/steps.py in to_request(self)
212 def to_request(self) -> RequestType:
213 """Gets the request structure for `ConfigurableRetryStep`."""
--> 214 step_dict = super().to_request()
215 if self.retry_policies:
216 step_dict["RetryPolicies"] = self._resolve_retry_policy(self.retry_policies)
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/steps.py in to_request(self)
101 "Name": self.name,
102 "Type": self.step_type.value,
--> 103 "Arguments": self.arguments,
104 }
105 if self.depends_on:
/opt/conda/lib/python3.7/site-packages/sagemaker/workflow/steps.py in arguments(self)
411 container_defs=self.model.prepare_container_def(
412 instance_type=self.inputs.instance_type,
--> 413 accelerator_type=self.inputs.accelerator_type,
414 ),
415 vpc_config=self.model.vpc_config,
/opt/conda/lib/python3.7/site-packages/sagemaker/model.py in prepare_container_def(self, instance_type, accelerator_type, serverless_inference_config)
411 """
412 deploy_key_prefix = fw_utils.model_code_key_prefix(
--> 413 self.key_prefix, self.name, self.image_uri
414 )
415 deploy_env = copy.deepcopy(self.env)
/opt/conda/lib/python3.7/site-packages/sagemaker/fw_utils.py in model_code_key_prefix(code_location_key_prefix, model_name, image)
393 str: the key prefix to be used in uploading code
394 """
--> 395 training_job_name = sagemaker.utils.name_from_image(image)
396 return "/".join(filter(None, [code_location_key_prefix, model_name or training_job_name]))
397
/opt/conda/lib/python3.7/site-packages/sagemaker/utils.py in name_from_image(image, max_length)
58 max_length (int): Maximum length for the resulting string (default: 63).
59 """
---> 60 return name_from_base(base_name_from_image(image), max_length=max_length)
61
62
/opt/conda/lib/python3.7/site-packages/sagemaker/utils.py in base_name_from_image(image)
100 str: Algorithm name, as extracted from the image name.
101 """
--> 102 m = re.match("^(.+/)?([^:/]+)(:[^:]+)?$", image)
103 algo_name = m.group(2) if m else image
104 return algo_name
/opt/conda/lib/python3.7/re.py in match(pattern, string, flags)
173 """Try to apply the pattern at the start of the string, returning
174 a Match object, or None if no match was found."""
--> 175 return _compile(pattern, flags).match(string)
176
177 def fullmatch(pattern, string, flags=0):
TypeError: expected string or bytes-like object
Upvotes: 1
Views: 2304
Reputation: 171
From the lambda get model_url of you model.tar.gz artifact as well the docker image url and create a new model, this way the object will be created only once and re-used when you execute the pipeline:
from sagemaker.model import Model
model = Model(
image_uri=image_uri,
model_data=model_url,
role=role,
sagemaker_session=pipeline_session,
)
create_model_step = ModelStep(
name="CreateInferenceModelStep",
step_args=model.create(),
)
from sagemaker.transformer import Transformer
transformer = Transformer(
model_name= create_model_step.properties.ModelName,
instance_type="ml.m5.4xlarge",
instance_count=3,
accept="application/jsonlines",
assemble_with="Line",
output_path=Join(on='/', values=['s3:/', bucket, base_job_prefix, "batchTransform", "inference", ExecutionVariables.PIPELINE_EXECUTION_ID]),
sagemaker_session=pipeline_session,
)
Upvotes: 0
Reputation: 11
you can replace your lambda function in order to not only return the model_arn, but as well create a model with the string value of model_arn like this:
def handler(event, context):
"""
Gets the latest model from the model registry and returns the s3 location of the model data
as well as the ECR location of the container to be used for inference.
"""
# Retrieve latest approved model
model_package_group_name = event['model_package_group_name']
role = event["role"]
pck = get_approved_package(model_package_group_name)
try:
model_description = sm_client.describe_model_package(
ModelPackageName=pck["ModelPackageArn"]
)
except ClientError as e:
error_msg = f"describe_model_package failed: {e.response['Error']['Code']}, {e.response['Error']['Message']}"
raise Exception(error_msg)
model_url = model_description["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]
image_uri = model_description["InferenceSpecification"]["Containers"][0]["Image"]
model_version = pck["ModelPackageVersion"]
model_arn = pck["ModelPackageArn"]
model_env = model_description["InferenceSpecification"]["Containers"][0]["Environment"]
model_name = model_package_group_name + "-v" + str(model_version)
container = {"ModelPackageName": model_arn}
create_model_respose = sm_client.create_model(ModelName=model_name, ExecutionRoleArn=role, Containers=[container] )
return {
"statusCode": 200,
"ModelName": model_name
}
then get the model Name and pass it to the Transformer (which can be a PipelineVariable)
transformer = Transformer(
model_name=step_latest_model_fetch.properties.Outputs["ModelName"],
instance_type="ml.m5.xlarge",
strategy='MultiRecord',
max_payload=18,
instance_count=nb_worker,
output_path=s3_transformer_output_path,
assemble_with="Line",
accept="text/csv",
max_concurrent_transforms=nb_worker
)
#then you have your step
TransformStep (transformer = transformer, ..)
Upvotes: 0
Reputation: 1
the class ModelPackage only receives a String as an ARN. You have to create a new model base on the model registry and then use the class model to perform a transformation
Upvotes: 0