Amatuer
Amatuer

Reputation: 1

ValidationException in Sagemaker pipeline creation: Step name must contain between 1 and 64 characters

Im new sagemaker. i got an error when try to create a pipline as "ClientError: An error occurred (ValidationException) when calling the CreatePipeline operation: Step: [ESTIMATION-DM-DATA-AGGREGATED-ingestion-pipeline-feature-processor]: Step name must contain between 1 and 64 characters." while i already call aggregate func and put the step=aggregate

from pyspark.sql import DataFrame, SparkSession
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import (
    feature_processor,
    CSVDataSource,
    FeatureGroupDataSource,
)

from sagemaker.remote_function.spark_config import SparkConfig

s3_bucket = "ABC"
sagemaker_session = sagemaker.Session(default_bucket=s3_bucket)

@remote(
    spark_config=SparkConfig(),
    instance_type="ml.m5.2xlarge",
    dependencies="./requirements.txt",
    sagemaker_session=sagemaker_session
)
@feature_processor(
    inputs=[FeatureGroupDataSource(ESTIMATION_FG_ARN)],
    output=ESTIMATION_AGGREGATE_FG_ARN,
    target_stores=["OfflineStore"],
)

def aggregate(source_feature_group, spark):
    """
    Aggregate the data using a SQL query
    """
    import time

    # Execute SQL string.
    source_feature_group.createOrReplaceTempView("estimation_data")
    agg_data = spark.sql(
        f"""
        SELECT *
        FROM estimation_data
        """
    )

    agg_data.show()
    return agg_data
aggregate()
ESTIMATION_AGGREGATE_FG_NAME = 'ESTIMATION-DM-DATA-AGGREGATED'

estimation_aggregate_pipeline_name = f"{ESTIMATION_AGGREGATE_FG_NAME}-ingestion-pipeline"
estimation_aggregate_pipeline_arn = fp.to_pipeline(
    pipeline_name=estimation_aggregate_pipeline_name,
    step=aggregate
)

print(f"Created SageMaker Pipeline: {estimation_aggregate_pipeline_arn}.")

estimation_aggregate_pipeline_execution_arn = fp.execute(
    pipeline_name=estimation_aggregate_pipeline_name
)
print(f"Started an execution with execution arn: {estimation_aggregate_pipeline_execution_arn}")


fp.schedule(
    pipeline_name=estimation_aggregate_pipeline_name,
    schedule_expression="rate(24 hours)",
    state="ENABLED",
)
print(f"Created a schedule.")

fp.describe(pipeline_name=estimation_aggregate_pipeline_name)
ClientError                               Traceback (most recent call last)
Cell In[41], line 3
      1 estimation_aggregate_pipeline_name = f"{ESTIMATION_AGGREGATE_FG_NAME}-ingestion-pipeline"
----> 3 estimation_aggregate_pipeline_arn = fp.to_pipeline(
      4     pipeline_name=estimation_aggregate_pipeline_name,
      5     step=aggregate
      6 )
      8 print(f"Created SageMaker Pipeline: {estimation_aggregate_pipeline_arn}.")
     10 estimation_aggregate_pipeline_execution_arn = fp.execute(
     11     pipeline_name=estimation_aggregate_pipeline_name
     12 )

File ~/myenv/lib/python3.9/site-packages/sagemaker/feature_store/feature_processor/feature_scheduler.py:223, in to_pipeline(pipeline_name, step, role, transformation_code, max_retries, tags, sagemaker_session)
    221 pipeline = Pipeline(**pipeline_request_dict)
    222 logger.info("Creating/Updating sagemaker pipeline %s", pipeline_name)
--> 223 pipeline.upsert(
    224     role_arn=_role,
    225     tags=pipeline_tags,
    226 )
    227 logger.info("Created sagemaker pipeline %s", pipeline_name)
    229 describe_pipeline_response = pipeline.describe()

File ~/myenv/lib/python3.9/site-packages/sagemaker/workflow/pipeline.py:297, in Pipeline.upsert(self, role_arn, description, tags, parallelism_config)
    295 error_message = ce.response["Error"]["Message"]
    296 if not (error_code == "ValidationException" and "already exists" in error_message):
--> 297     raise ce
    298 # already exists
    299 response = self.update(role_arn, description, parallelism_config=parallelism_config)

File ~/myenv/lib/python3.9/site-packages/sagemaker/workflow/pipeline.py:292, in Pipeline.upsert(self, role_arn, description, tags, parallelism_config)
    290     raise ValueError("An AWS IAM role is required to create or update a Pipeline.")
    291 try:
--> 292     response = self.create(role_arn, description, tags, parallelism_config)
    293 except ClientError as ce:
    294     error_code = ce.response["Error"]["Code"]

File ~/myenv/lib/python3.9/site-packages/sagemaker/workflow/pipeline.py:169, in Pipeline.create(self, role_arn, description, tags, parallelism_config)
    164 kwargs = self._create_args(role_arn, description, parallelism_config)
    165 update_args(
    166     kwargs,
    167     Tags=tags,
    168 )
--> 169 return self.sagemaker_session.sagemaker_client.create_pipeline(**kwargs)

File ~/myenv/lib/python3.9/site-packages/botocore/client.py:565, in ClientCreator._create_api_method.<locals>._api_call(self, *args, **kwargs)
    561     raise TypeError(
    562         f"{py_operation_name}() only accepts keyword arguments."
    563     )
    564 # The "self" in this scope is referring to the BaseClient.
--> 565 return self._make_api_call(operation_name, kwargs)

File ~/myenv/lib/python3.9/site-packages/botocore/client.py:1021, in BaseClient._make_api_call(self, operation_name, api_params)
   1017     error_code = error_info.get("QueryErrorCode") or error_info.get(
   1018         "Code"
   1019     )
   1020     error_class = self.exceptions.from_code(error_code)
-> 1021     raise error_class(parsed_response, operation_name)
   1022 else:
   1023     return parsed_response

ClientError: An error occurred (ValidationException) when calling the CreatePipeline operation: Step: [ESTIMATION-DM-DATA-AGGREGATED-ingestion-pipeline-feature-processor]: Step name must contain between 1 and 64 characters.

try many time to debug but cannot

Upvotes: 0

Views: 90

Answers (1)

Amatuer
Amatuer

Reputation: 1

change the

ESTIMATION_AGGREGATE_FG_NAME = 'ESTIMATION-DM-DATA-AGGREGATED'

to short string like

ESTIMATION_AGGREGATE_FG_NAME = 'DATA-AGGREGATED'

will solve this error. I dont know why I got that error even len of estimation_aggregate_pipeline_name is 48

Upvotes: 0

Related Questions