Reputation: 1
Im new sagemaker. i got an error when try to create a pipline as "ClientError: An error occurred (ValidationException) when calling the CreatePipeline operation: Step: [ESTIMATION-DM-DATA-AGGREGATED-ingestion-pipeline-feature-processor]: Step name must contain between 1 and 64 characters." while i already call aggregate func and put the step=aggregate
from pyspark.sql import DataFrame, SparkSession
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import (
feature_processor,
CSVDataSource,
FeatureGroupDataSource,
)
from sagemaker.remote_function.spark_config import SparkConfig
s3_bucket = "ABC"
sagemaker_session = sagemaker.Session(default_bucket=s3_bucket)
@remote(
spark_config=SparkConfig(),
instance_type="ml.m5.2xlarge",
dependencies="./requirements.txt",
sagemaker_session=sagemaker_session
)
@feature_processor(
inputs=[FeatureGroupDataSource(ESTIMATION_FG_ARN)],
output=ESTIMATION_AGGREGATE_FG_ARN,
target_stores=["OfflineStore"],
)
def aggregate(source_feature_group, spark):
"""
Aggregate the data using a SQL query
"""
import time
# Execute SQL string.
source_feature_group.createOrReplaceTempView("estimation_data")
agg_data = spark.sql(
f"""
SELECT *
FROM estimation_data
"""
)
agg_data.show()
return agg_data
aggregate()
ESTIMATION_AGGREGATE_FG_NAME = 'ESTIMATION-DM-DATA-AGGREGATED'
estimation_aggregate_pipeline_name = f"{ESTIMATION_AGGREGATE_FG_NAME}-ingestion-pipeline"
estimation_aggregate_pipeline_arn = fp.to_pipeline(
pipeline_name=estimation_aggregate_pipeline_name,
step=aggregate
)
print(f"Created SageMaker Pipeline: {estimation_aggregate_pipeline_arn}.")
estimation_aggregate_pipeline_execution_arn = fp.execute(
pipeline_name=estimation_aggregate_pipeline_name
)
print(f"Started an execution with execution arn: {estimation_aggregate_pipeline_execution_arn}")
fp.schedule(
pipeline_name=estimation_aggregate_pipeline_name,
schedule_expression="rate(24 hours)",
state="ENABLED",
)
print(f"Created a schedule.")
fp.describe(pipeline_name=estimation_aggregate_pipeline_name)
ClientError Traceback (most recent call last)
Cell In[41], line 3
1 estimation_aggregate_pipeline_name = f"{ESTIMATION_AGGREGATE_FG_NAME}-ingestion-pipeline"
----> 3 estimation_aggregate_pipeline_arn = fp.to_pipeline(
4 pipeline_name=estimation_aggregate_pipeline_name,
5 step=aggregate
6 )
8 print(f"Created SageMaker Pipeline: {estimation_aggregate_pipeline_arn}.")
10 estimation_aggregate_pipeline_execution_arn = fp.execute(
11 pipeline_name=estimation_aggregate_pipeline_name
12 )
File ~/myenv/lib/python3.9/site-packages/sagemaker/feature_store/feature_processor/feature_scheduler.py:223, in to_pipeline(pipeline_name, step, role, transformation_code, max_retries, tags, sagemaker_session)
221 pipeline = Pipeline(**pipeline_request_dict)
222 logger.info("Creating/Updating sagemaker pipeline %s", pipeline_name)
--> 223 pipeline.upsert(
224 role_arn=_role,
225 tags=pipeline_tags,
226 )
227 logger.info("Created sagemaker pipeline %s", pipeline_name)
229 describe_pipeline_response = pipeline.describe()
File ~/myenv/lib/python3.9/site-packages/sagemaker/workflow/pipeline.py:297, in Pipeline.upsert(self, role_arn, description, tags, parallelism_config)
295 error_message = ce.response["Error"]["Message"]
296 if not (error_code == "ValidationException" and "already exists" in error_message):
--> 297 raise ce
298 # already exists
299 response = self.update(role_arn, description, parallelism_config=parallelism_config)
File ~/myenv/lib/python3.9/site-packages/sagemaker/workflow/pipeline.py:292, in Pipeline.upsert(self, role_arn, description, tags, parallelism_config)
290 raise ValueError("An AWS IAM role is required to create or update a Pipeline.")
291 try:
--> 292 response = self.create(role_arn, description, tags, parallelism_config)
293 except ClientError as ce:
294 error_code = ce.response["Error"]["Code"]
File ~/myenv/lib/python3.9/site-packages/sagemaker/workflow/pipeline.py:169, in Pipeline.create(self, role_arn, description, tags, parallelism_config)
164 kwargs = self._create_args(role_arn, description, parallelism_config)
165 update_args(
166 kwargs,
167 Tags=tags,
168 )
--> 169 return self.sagemaker_session.sagemaker_client.create_pipeline(**kwargs)
File ~/myenv/lib/python3.9/site-packages/botocore/client.py:565, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs)
561 raise TypeError(
562 f"{py_operation_name}() only accepts keyword arguments."
563 )
564 # The "self" in this scope is referring to the BaseClient.
--> 565 return self._make_api_call(operation_name, kwargs)
File ~/myenv/lib/python3.9/site-packages/botocore/client.py:1021, in BaseClient._make_api_call(self, operation_name, api_params)
1017 error_code = error_info.get("QueryErrorCode") or error_info.get(
1018 "Code"
1019 )
1020 error_class = self.exceptions.from_code(error_code)
-> 1021 raise error_class(parsed_response, operation_name)
1022 else:
1023 return parsed_response
ClientError: An error occurred (ValidationException) when calling the CreatePipeline operation: Step: [ESTIMATION-DM-DATA-AGGREGATED-ingestion-pipeline-feature-processor]: Step name must contain between 1 and 64 characters.
try many time to debug but cannot
Upvotes: 0
Views: 90
Reputation: 1
change the
ESTIMATION_AGGREGATE_FG_NAME = 'ESTIMATION-DM-DATA-AGGREGATED'
to short string like
ESTIMATION_AGGREGATE_FG_NAME = 'DATA-AGGREGATED'
will solve this error. I dont know why I got that error even len of estimation_aggregate_pipeline_name is 48
Upvotes: 0