Reputation: 43
I am very new to the kuebflow and I am trying to use create component with containerized python component. I was able to do some progress but I dont know how to use OutputPath and Output in my pipeline flow. I need some help here.
Below is my component, it returns a dictonary. I would like to use outputpath or output to convert this into an dataset artifact which I can use later in the pipeline.
%%writefile src/components/model_train_cnn/model_train_component.py
from kfp import dsl
from kfp import compiler
from typing import Dict
from kfp.dsl import Dataset,Output,Artifact,OutputPath,InputPath
@dsl.component(base_image='mohitverma1688/model_train_component:v0.1',
target_image='mohitverma1688/model_train_component:v0.20',
packages_to_install=['pandas']
)
def model_train(num_epochs:int,
batch_size:int,
hidden_units:int,
learning_rate: float,
train_dir: str,
test_dir: str,
model_name: str,
model_dir: str,
export_bucket: str = "modelbucket",
) -> Dict[str, list] :
import os
import json
import pandas as pd
import torch
import data_setup, engine, model_builder, utils
from torchvision import transforms
# Setup hyperparameters
NUM_EPOCHS = num_epochs
BATCH_SIZE = batch_size
HIDDEN_UNITS = hidden_units
LEARNING_RATE = learning_rate
MODEL_NAME = model_name
MODEL_DIR = model_dir
EXPORT_BUCKET = export_bucket
# Setup directories
TRAIN_DIR = train_dir
TEST_DIR = test_dir
# Setup target device
device = "cuda" if torch.cuda.is_available() else "cpu"
# Create transforms
data_transform = transforms.Compose([
transforms.Resize((64, 64)),
transforms.ToTensor()
])
# Create DataLoaders with help from data_setup.py
train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(
train_dir=TRAIN_DIR,
test_dir=TEST_DIR,
transform=data_transform,
batch_size=BATCH_SIZE
)
# Create model with help from model_builder.py
model = model_builder.TinyVGG(
input_shape=3,
hidden_units=HIDDEN_UNITS,
output_shape=len(class_names)
).to(device)
# Set loss and optimizer
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),
lr=LEARNING_RATE)
# Start training with help from engine.py
result = engine.train(model=model,
train_dataloader=train_dataloader,
test_dataloader=test_dataloader,
loss_fn=loss_fn,
optimizer=optimizer,
epochs=NUM_EPOCHS,
device=device)
# Save the model with help from utils.py
utils.save_model(model=model,
model_dir=MODEL_DIR,
model_name=MODEL_NAME + ".pth",
export_bucket=EXPORT_BUCKET)
return result
This is what I want to return using OutputPath or Output. I want to convert this dict to a dataframe and store this as pipeline output for using in next step.
import pandas as pd
model_0_df = pd.DataFrame(x)
model_0_df
test_acc test_loss train_acc train_loss
0 0.197917 1.099857 0.316406 1.100338
1 0.541667 1.090166 0.253906 1.104582
2 0.541667 1.090699 0.281250 1.101027
3 0.260417 1.093742 0.277344 1.097989
4 0.260417 1.089232 0.304688 1.097009
5 0.541667 1.083633 0.414062 1.094837
6 0.541667 1.078700 0.281250 1.110045
7 0.541667 1.081974 0.281250 1.100708
8 0.541667 1.087431 0.402344 1.096542
9 0.541667 1.086511 0.281250 1.097733
Below is my pipeline
#%%writefile pipeline.py
from kfp import kubernetes
from kfp import dsl
from kfp import compiler
BASE_PATH="/data"
URL="https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip"
INPUT_BUCKET="datanewbucket"
NUM_EPOCHS=10
BATCH_SIZE = 32
HIDDEN_UNITS = 10
LEARNING_RATE = 0.01
MODEL_NAME = "cnn_tinyvg_v4"
MODEL_DIR = "/data/models"
EXPORT_BUCKET = "modeloutput"
TRAIN_DIR = "/data/train"
TEST_DIR = "/data/test"
from src.components.data_download.data_download_component import dataset_download
from src.components.model_train_cnn.model_train_component import model_train
@dsl.pipeline(name='CNN-TinyVG-Demo',
description='This pipeline is a demo for training,evaluating and deploying Convutional Neural network',
display_name='Kubeflow-MlFLow-Demo')
def kubeflow_pipeline(base_path: str = BASE_PATH,
url:str = URL,
batch_size:int = BATCH_SIZE,
train_dir:str = TRAIN_DIR,
test_dir:str = TEST_DIR,
input_bucket:str = INPUT_BUCKET,
num_epochs: int = NUM_EPOCHS,
hidden_units:int = HIDDEN_UNITS,
learning_rate:float = LEARNING_RATE,
model_name: str = MODEL_NAME,
model_dir: str = MODEL_DIR,
export_bucket: str = EXPORT_BUCKET
):
pvc1 = kubernetes.CreatePVC(
# can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
pvc_name='kubeflow-pvc4',
access_modes=['ReadWriteOnce'],
size='500Mi',
storage_class_name='standard',
)
task1 = dataset_download(base_path=base_path,
url=url,
input_bucket=input_bucket)
task1.set_caching_options(True)
task2 = model_train(batch_size=batch_size,
num_epochs=num_epochs,
train_dir=train_dir,
test_dir=test_dir,
hidden_units=hidden_units,
learning_rate=learning_rate,
model_name=model_name,
model_dir=model_dir,
export_bucket=export_bucket,
).after(task1)
task2.set_caching_options(False)
# task3 = model_inference(data_dir=DATA_DIR).after(task2)
# task3.set_caching_options(False)
kubernetes.mount_pvc(
task1,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
kubernetes.mount_pvc(
task2,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
# kubernetes.mount_pvc(
# task3,
# pvc_name=pvc1.outputs['name'],
# mount_path='/data',
# )
compiler.Compiler().compile(kubeflow_pipeline, 'kubeflow-demo.yaml')
kfp 2.7.0
kfp-kubernetes 1.2.0
kfp-pipeline-spec 0.3.0
kfp-server-api 2.0.5
I tried many ways to return this dataset into an output dataset artifcat but I think my understanding is not correct.
Upvotes: 0
Views: 180