Reputation: 43
I am very very new to both AI and MLOP's, please forgive me if my question is dumb. I am trying to learn about kubeflow but there is too much information on kubeflow documentation, then there are version >2 and <2 , these things adds to the complexity. I have python scripts to train a TinyVGG CNN model and then evaluate a model which works perfectly fine. I want to use those scripts to convert them into a pipeling, however I am not getting how to create kfp components from those files. Please help me to guide , how I can break these scripts into components and then use them in the pipeline. One of my major problem is how to pass complex data type liketorch.utils.data.dataloader.DataLoader
from one component to other. I have checked kubeflow document , but it gives a simple example.
https://www.kubeflow.org/docs/components/pipelines/v2/components/containerized-python-components/
This is how my scripts setup looks.
.
├── __pycache__
│ ├── data_setup.cpython-310.pyc
│ ├── engine.cpython-310.pyc
│ ├── model_builder.cpython-310.pyc
│ └── utils.cpython-310.pyc
├── data_setup.py
├── engine.py
├── get_data.py
├── model_builder.py
├── predict.py
├── train.py
└── utils.py
To train, I simple run the train.py module with the arguments,
python modular_ML_code/going_modular_argparse/train.py --num_epochs 10 --hidden_units=128 --train-dir=<path to train dir> --test-dir=<path to test_dir>
In my train.py file, I have to import modules like below
import data_setup, engine, model_builder, utils
...
train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(
train_dir=train_dir,
test_dir=test_dir,
transform=data_transform,
batch_size=BATCH_SIZE
)
...
%%writefile modular_ML_code/going_modular_argparse/train.py
"""
Trains a PyTorch image classification model using device-agnostic code.
"""
import os
import argparse
import torch
import data_setup, engine, model_builder, utils
from torchvision import transforms
#Create a parser
parser = argparse.ArgumentParser(description="Get some hyperparameters")
# Get an arg for num_epochs
parser.add_argument("--num_epochs",
type=int,
help="the number of epochs to train for",
default=10)
# Get an arg for batch_size
parser.add_argument("--batch_size",
default=32,
type=int,
help="number of samples per batch")
# Get an arg for hidden_units
parser.add_argument("--hidden_units",
default=10,
type=int,
help="number of hidden units in hidden layers")
# Get an arge fpr learning_rate
parser.add_argument("--learning_rate",
default=0.001,
type=float,
help="learning rate use for model")
# Create a arg for the training directory
parser.add_argument("--train_dir",
default="modular_ML_code/data/pizza_steak_sushi/train",
type=str,
help="Directory file path to training data in standard image classification format")
# Create a arg for the testing directory
parser.add_argument("--test_dir",
default="modular_ML_code/data/pizza_steak_sushi/test",
type=str,
help="Directory file path to testing data in standard image classification format")
# Get out arguments from the parser
args = parser.parse_args()
# Setup hyperparameters
NUM_EPOCHS = args.num_epochs
BATCH_SIZE = args.batch_size
HIDDEN_UNITS = args.hidden_units
LEARNING_RATE = args.learning_rate
# Setup directories
train_dir = args.train_dir
test_dir = args.test_dir
# Setup target device
device = "cuda" if torch.cuda.is_available() else "cpu"
# Create transforms
data_transform = transforms.Compose([
transforms.Resize((64, 64)),
transforms.ToTensor()
])
# Create DataLoaders with help from data_setup.py
train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(
train_dir=train_dir,
test_dir=test_dir,
transform=data_transform,
batch_size=BATCH_SIZE
)
# Create model with help from model_builder.py
model = model_builder.TinyVGG(
input_shape=3,
hidden_units=HIDDEN_UNITS,
output_shape=len(class_names)
).to(device)
# Set loss and optimizer
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),
lr=LEARNING_RATE)
# Start training with help from engine.py
engine.train(model=model,
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
loss_fn=loss_fn,
optimizer=optimizer,
epochs=NUM_EPOCHS,
device=device)
# Save the model with help from utils.py
utils.save_model(model=model,
target_dir="models",
model_name="05_going_modular_script_mode_tinyvgg_model.pth")
The output returns like this
0%| | 0/10 [00:00<?, ?it/s]Epoch: 1 | train_loss: 1.1460 | train_acc: 0.2891 | test_loss: 1.0858 | test_acc: 0.2708
10%|████▍ | 1/10 [00:07<01:07, 7.45s/it]Epoch: 2 | train_loss: 1.0724 | train_acc: 0.2930 | test_loss: 1.0242 | test_acc: 0.3201
20%|████████▊ | 2/10 [00:14<00:58, 7.32s/it]Epoch: 3 | train_loss: 1.0318 | train_acc: 0.5586 | test_loss: 1.0779 | test_acc: 0.3930
30%|█████████████▏ | 3/10 [00:22<00:51, 7.38s/it]Epoch: 4 | train_loss: 1.0128 | train_acc: 0.5000 | test_loss: 1.2437 | test_acc: 0.4034
40%|█████████████████▌ | 4/10 [00:29<00:43, 7.32s/it]Epoch: 5 | train_loss: 1.0905 | train_acc: 0.4336 | test_loss: 1.0099 | test_acc: 0.5852
50%|██████████████████████ | 5/10 [00:36<00:36, 7.30s/it]Epoch: 6 | train_loss: 0.9365 | train_acc: 0.6367 | test_loss: 1.0314 | test_acc: 0.4025
60%|██████████████████████████▍ | 6/10 [00:43<00:29, 7.26s/it]Epoch: 7 | train_loss: 0.9894 | train_acc: 0.5195 | test_loss: 1.0308 | test_acc: 0.4025
70%|██████████████████████████████▊ | 7/10 [00:51<00:21, 7.30s/it]Epoch: 8 | train_loss: 1.0748 | train_acc: 0.5156 | test_loss: 1.0312 | test_acc: 0.3939
80%|███████████████████████████████████▏ | 8/10 [00:58<00:14, 7.37s/it]Epoch: 9 | train_loss: 0.9672 | train_acc: 0.3867 | test_loss: 0.9279 | test_acc: 0.6250
90%|███████████████████████████████████████▌ | 9/10 [01:05<00:07, 7.33s/it]Epoch: 10 | train_loss: 0.8184 | train_acc: 0.6758 | test_loss: 0.9775 | test_acc: 0.4233
100%|███████████████████████████████████████████| 10/10 [01:13<00:00, 7.31s/it]
[INFO] Saving model to: models/05_going_modular_script_mode_tinyvgg_model.pth
What I want is to break these steps in to different components and create a MLOPS pipeline. Could you please help.
Below is what I tried:-
import kfp
from kfp import dsl
from kfp import compiler
from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, ClassificationMetrics,
Metrics, component)
@component(
base_image="python:3.10",
packages_to_install=["boto3","requests","pathlib"]
)
def download_dataset(input_bucket: str, data_path: str
):
"""Download the custom data set to the Kubeflow Pipelines volume to share it among all steps"""
import os
import zipfile
import requests
from pathlib import Path
import boto3
from botocore.client import Config
s3 = boto3.client(
"s3",
endpoint_url="http://minio-service.kubeflow:9000",
aws_access_key_id="minio",
aws_secret_access_key="minio123",
config=Config(signature_version="s3v4"),
)
# Create export bucket if it does not yet exist
response = s3.list_buckets()
input_bucket_exists = False
for bucket in response["Buckets"]:
if bucket["Name"] == input_bucket:
input_bucket_exists = True
if not input_bucket_exists:
s3.create_bucket(ACL="public-read-write", Bucket=input_bucket)
# Save zip files to S3 import_bucket
data_path = Path(data_path)
if data_path.is_dir():
print(f"{data_path} directory exists.")
else:
print(f"Did not find {data_path} directory, creating one...")
data_path.mkdir(parents=True,exist_ok=True)
# Download pizza , steak and sushi data
with open(data_path/ "pizza_steak_sushi.zip", "wb") as f:
request = requests.get("https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip")
print("Downloading pizza, steak, sushi data...")
f.write(request.content)
for root, dir, files in os.walk(data_path):
for filename in files:
local_path = os.path.join(root,filename)
s3.upload_file(
local_path,
input_bucket,
f"{local_path}",
ExtraArgs={"ACL": "public-read"},
)
@component(
base_image="python:3.10",
packages_to_install=["torch","torchvision","boto3","pathlib","requests"]
)
def process_data(
input_bucket: str,
data_path: str,
train_dir: str,
test_dir: str,
batch_size: int,
num_workers: int=0,
):
import os
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import boto3
import zipfile
from pathlib import Path
from botocore.client import Config
import requests
def create_dataloaders(
train_dir: str,
test_dir: str,
transform: transforms.Compose,
batch_size: int,
num_workers: int=num_workers
):
# Use ImageFolder to create dataset(s)
train_data = datasets.ImageFolder(train_dir, transform=transform)
test_data = datasets.ImageFolder(test_dir, transform=transform)
# Get class names
class_names = train_data.classes
# Turn images into data loaders
train_dataloader = DataLoader(
train_data,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
)
test_dataloader = DataLoader(
test_data,
batch_size=batch_size,
shuffle=False, # don't need to shuffle test data
num_workers=num_workers,
pin_memory=True,
)
return train_dataloader, test_dataloader, class_names
data_transform = transforms.Compose([
transforms.Resize((64, 64)),
transforms.ToTensor()
])
data_path = Path(data_path)
image_path = data_path / "pizza_steak_sushi"
if image_path.is_dir():
print(f"{image_path} directory exists.")
else:
print(f"Did not find {image_path} directory, creating one...")
image_path.mkdir(parents=True,exist_ok=True)
with open(data_path/ "pizza_steak_sushi.zip", "wb") as f:
request = requests.get("https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip")
print("Downloading pizza, steak, sushi data...")
f.write(request.content)
#Unzip the data
with zipfile.ZipFile(data_path/"pizza_steak_sushi.zip", "r") as zip_ref:
print("Unzipping pizza, steak, sushi data...")
zip_ref.extractall(image_path)
# Remove the zip file
os.remove(data_path / "pizza_steak_sushi.zip")
# Create DataLoaders with help from data_setup.py
train_dataloader, test_dataloader, class_names = create_dataloaders(
train_dir=train_dir,
test_dir=test_dir,
transform=data_transform,
batch_size=batch_size
)
from kfp import compiler
INPUT_BUCKET = "inputdata"
DATA_PATH = "dataset"
TRAIN_DIR = "dataset/pizza_steak_sushi/train"
TEST_DIR = "dataset/pizza_steak_sushi/test"
BATCH_SIZE = 32
HIDDEN_UNITS = 10
@dsl.pipeline(
name="End-to-End-MNIST",
description="A sample pipeline to demonstrate multi-step model training, evaluation, export, and serving",
)
def my_pipeline(input_bucket: str = INPUT_BUCKET,
data_path: str = DATA_PATH,
train_dir: str = TRAIN_DIR,
test_dir: str = TEST_DIR,
batch_size: int = BATCH_SIZE,
hidden_units: int = HIDDEN_UNITS
):
import_data_op = download_dataset(input_bucket=INPUT_BUCKET,
data_path=DATA_PATH)
create_data_loader_op = process_data(input_bucket=INPUT_BUCKET,
data_path=DATA_PATH,
train_dir=TRAIN_DIR,
test_dir=TEST_DIR,
batch_size=BATCH_SIZE,
)
create_model_op = create_model(hidden_units=HIDDEN_UNITS)
create_data_loader_op.after(import_data_op)
create_model_op.after(create_data_loader_op)
if __name__ == '__main__':
compiler.Compiler().compile( my_pipeline,'pipeline2.yaml')
from kfp.client import Client
client = Client(host='http://localhost:8002')
run = client.create_run_from_pipeline_package(
'pipeline2.yaml',
)
Now I want to pass this class_names assingment in the other step, which is to create a model->
@component(
base_image="python:3.10",
packages_to_install=["torch","torchvision","boto3","pathlib","requests"]
)
def train_model(
hidden_units: int,
):
import torch
from torch import nn
class TinyVGG(nn.Module):
def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
super().__init__()
self.conv_block_1 = nn.Sequential(
nn.Conv2d(in_channels=input_shape,
out_channels=hidden_units,
kernel_size=3,
stride=1,
padding=0),
nn.ReLU(),
nn.Conv2d(in_channels=hidden_units,
out_channels=hidden_units,
kernel_size=3,
stride=1,
padding=0),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2,
stride=2)
)
self.conv_block_2 = nn.Sequential(
nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=0),
nn.ReLU(),
nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=0),
nn.ReLU(),
nn.MaxPool2d(2)
)
self.classifier = nn.Sequential(
nn.Flatten(),
# Where did this in_features shape come from?
# It's because each layer of our network compresses and changes the shape of our inputs data.
nn.Linear(in_features=hidden_units*13*13,
out_features=output_shape)
)
def forward(self, x: torch.Tensor):
x = self.conv_block_1(x)
x = self.conv_block_2(x)
x = self.classifier(x)
return x
# return self.classifier(self.conv_block_2(self.conv_block_1(x))) # <- leverage the benefits of operator fusion
device = "cuda" if torch.cuda.is_available() else "cpu"
model = TinyVGG(
input_shape=3,
hidden_units=hidden_units,
output_shape=len(class_names)
).to(device)
Upvotes: 0
Views: 314
Reputation: 43
I was able to create a pipeline using v2.0 method by containerising modules.
I have breakdown each step.
── Dockerfile
├── build.sh
├── src
│ ├── gen.py
│ ├── requirements.txt
from kfp import dsl
@dsl.container_component
def model_train():
return dsl.ContainerSpec(image='mohitverma1688/model_train:v0.1.1',
command=['/bin/sh'], args=['-c' ,' python3 train.py --num_epochs 10 --batch_size 32 --hidden_units 10 --train_dir /data/train --learning_rate 0.01 --test_dir /data/train --target_dir /data/models '])
Upvotes: 0