Reputation: 1
I'm trying to do a Batch Transform Job using my own container. I don't know what could be the problem on the invocation of the job.
Here is my Dockerfile
FROM python:3.7
COPY requirements.txt /opt/program/requirements.txt
RUN apt-get -y update && apt-get install -y --no-install-recommends \
wget \
nginx \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install --no-cache-dir -r /opt/program/requirements.txt
ENV AWS_DEFAULT_REGION=eu-east-1
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/program:${PATH}"
ENV MODEL_PATH="/opt/ml/model"
COPY ./model /opt/program
WORKDIR /opt/program
The folder structure is at follows:
└── <parent_folder>/
├── model/
│ ├── serve
│ └── predictor.py
├── Dockerfile
└── Requirements
I can build the container and see it con ECR. The predictor.py looks like this:
import os
import pickle
import io
from fastapi import FastAPI, HTTPException, Response, status
import pandas as pd
app = FastAPI()
model_path = os.environ['MODEL_PATH']
class ScoringService:
model = None
@classmethod
def get_model(cls):
if cls.model is None:
with open(os.path.join(model_path, 'model.pkl'), 'rb') as f:
cls.model = pickle.load(f)
return cls.model
@classmethod
def predict(cls, x):
clf = cls.get_model()
return clf.predict(x)
@app.get('/ping')
def ping():
health = ScoringService.get_model() is not None
status_code = status.HTTP_200_OK if health else status.HTTP_404_NOT_FOUND
return Response(status_code=status_code)
@app.post('/invocations', status_code=status.HTTP_200_OK)
def transformation(content_type: str, data: bytes):
if content_type != 'text/csv':
raise HTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail='This predictor only supports CSV data')
print('----- entre a transformation ok')
data = data.decode('utf-8')
s = io.StringIO(data)
df = pd.read_csv(s, header=None)
print('----- lectura data ok ok')
print(f'Invoked with {df.shape[0]} records')
predictions = ScoringService.predict(df)
print('----- predict ok')
out = io.StringIO()
pd.DataFrame({'results': predictions}).to_csv(out, header=False, index=False)
result = out.getvalue()
print('----- conversion de results ok')
return Response(content=result, media_type='text/csv')
The health check work just fine, so the batch transform job is built succesfully but when it tries to do the predictions on the /invocations method, it fails under this error:
POST /invocations HTTP/1.1" 422 Unprocessable Entity
The model is in a s3 bucket in the moment I created the SageMaker model and the container image is the one I created with the Dockerfile I showed before. The data is a csv generated from pandas that is living in a s3 bucket also.
The configuration of my Batch Transform Job is at follows:
client.create_transform_job(
TransformJobName=f'mlbatchtransform-{datetime.datetime.now().strftime("%Y%m%d%M%S")}',
ModelName='mlbatchtransform',
MaxConcurrentTransforms=1,
BatchStrategy='MultiRecord',
ModelClientConfig={
'InvocationsTimeoutInSeconds': 100,
'InvocationsMaxRetries': 3
},
MaxPayloadInMB=50,
TransformInput={
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': 's3://<bucket>/mlbatchtransform/inference/input/inference.csv',
}
},
'ContentType': 'text/csv',
'CompressionType': 'None',
'SplitType': 'Line',
},
TransformOutput={
'S3OutputPath': 's3://<bucket>/mlbatchtransform/inference/output',
'Accept': 'text/csv',
'AssembleWith': 'None',
},
DataCaptureConfig={
'DestinationS3Uri': 's3://<bucket>/mlbatchtransform/datacapture',
'GenerateInferenceId': True
},
TransformResources={
'InstanceType': 'ml.m5.large',
'InstanceCount': 1,
},
)
I've also tried to set S3DataSource as this: S3Uri': 's3:///mlbatchtransform/inference/input,
I've tried to refactor the invocations method to only recieve one argument, for example, request and use request.data and request.content_type go get that values. I've also tried to save the pandas dataframe with and withouh header
Upvotes: 0
Views: 69