Francesco Camussoni
Francesco Camussoni

Reputation: 1

422 Unprocessable Entity on SageMaker Batch Transform Job

I'm trying to do a Batch Transform Job using my own container. I don't know what could be the problem on the invocation of the job.

Here is my Dockerfile

FROM python:3.7
COPY requirements.txt /opt/program/requirements.txt 
RUN apt-get -y update && apt-get install -y --no-install-recommends \
wget \
nginx \         
ca-certificates \    
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install --no-cache-dir -r /opt/program/requirements.txt
ENV AWS_DEFAULT_REGION=eu-east-1
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/program:${PATH}"
ENV MODEL_PATH="/opt/ml/model"

COPY ./model /opt/program
WORKDIR /opt/program

The folder structure is at follows:

└── <parent_folder>/
    ├── model/
    │   ├── serve
    │   └── predictor.py
    ├── Dockerfile
    └── Requirements

I can build the container and see it con ECR. The predictor.py looks like this:

import os
import pickle
import io
from fastapi import FastAPI, HTTPException, Response, status
import pandas as pd

app = FastAPI()

model_path = os.environ['MODEL_PATH']

class ScoringService:
    model = None

    @classmethod
    def get_model(cls):
        if cls.model is None:
            with open(os.path.join(model_path, 'model.pkl'), 'rb') as f:
                cls.model = pickle.load(f)
        return cls.model

    @classmethod
    def predict(cls, x):
        clf = cls.get_model()
        return clf.predict(x)


@app.get('/ping')
def ping():
    health = ScoringService.get_model() is not None
    status_code = status.HTTP_200_OK if health else status.HTTP_404_NOT_FOUND
    return Response(status_code=status_code)


@app.post('/invocations', status_code=status.HTTP_200_OK)
def transformation(content_type: str, data: bytes):
    if content_type != 'text/csv':
        raise HTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
                            detail='This predictor only supports CSV data')
    print('----- entre a transformation ok')
    
    data = data.decode('utf-8')
    s = io.StringIO(data)
    df = pd.read_csv(s, header=None)

    print('----- lectura data ok ok')
    
    print(f'Invoked with {df.shape[0]} records')

    predictions = ScoringService.predict(df)

    print('----- predict ok')

    out = io.StringIO()
    pd.DataFrame({'results': predictions}).to_csv(out, header=False, index=False)
    result = out.getvalue()

    print('----- conversion de results ok')

    return Response(content=result, media_type='text/csv')

The health check work just fine, so the batch transform job is built succesfully but when it tries to do the predictions on the /invocations method, it fails under this error:

POST /invocations HTTP/1.1" 422 Unprocessable Entity

The model is in a s3 bucket in the moment I created the SageMaker model and the container image is the one I created with the Dockerfile I showed before. The data is a csv generated from pandas that is living in a s3 bucket also.

The configuration of my Batch Transform Job is at follows:

client.create_transform_job(
    TransformJobName=f'mlbatchtransform-{datetime.datetime.now().strftime("%Y%m%d%M%S")}',
    ModelName='mlbatchtransform',
    MaxConcurrentTransforms=1,
    BatchStrategy='MultiRecord',
    ModelClientConfig={
        'InvocationsTimeoutInSeconds': 100,
        'InvocationsMaxRetries': 3
    },
    MaxPayloadInMB=50,
    TransformInput={
        'DataSource': {
            'S3DataSource': {
                'S3DataType': 'S3Prefix',
                'S3Uri': 's3://<bucket>/mlbatchtransform/inference/input/inference.csv',
            }
        },
        'ContentType': 'text/csv',
        'CompressionType': 'None',
        'SplitType': 'Line',
    },
    TransformOutput={
        'S3OutputPath': 's3://<bucket>/mlbatchtransform/inference/output',
        'Accept': 'text/csv',
        'AssembleWith': 'None',
    },
    DataCaptureConfig={
        'DestinationS3Uri': 's3://<bucket>/mlbatchtransform/datacapture',
        'GenerateInferenceId': True
    },
    TransformResources={
        'InstanceType': 'ml.m5.large',
        'InstanceCount': 1,
    },
)

I've also tried to set S3DataSource as this: S3Uri': 's3:///mlbatchtransform/inference/input,

I've tried to refactor the invocations method to only recieve one argument, for example, request and use request.data and request.content_type go get that values. I've also tried to save the pandas dataframe with and withouh header

Upvotes: 0

Views: 69

Answers (0)

Related Questions