Mohit Verma
Mohit Verma

Reputation: 43

How to use Output/OutputPath in kubeflow v2.0?

I am very new to the kuebflow and I am trying to use create component with containerized python component. I was able to do some progress but I dont know how to use OutputPath and Output in my pipeline flow. I need some help here.

Below is my component, it returns a dictonary. I would like to use outputpath or output to convert this into an dataset artifact which I can use later in the pipeline.

%%writefile src/components/model_train_cnn/model_train_component.py
from kfp import dsl
from kfp import compiler
from typing import Dict
from kfp.dsl import Dataset,Output,Artifact,OutputPath,InputPath

@dsl.component(base_image='mohitverma1688/model_train_component:v0.1',
               target_image='mohitverma1688/model_train_component:v0.20',
               packages_to_install=['pandas']
               )

def model_train(num_epochs:int, 
                batch_size:int, 
                hidden_units:int,
                learning_rate: float,
                train_dir: str,
                test_dir: str,
                model_name: str,
                model_dir: str,
                export_bucket: str = "modelbucket",        
               ) -> Dict[str, list] :

            import os
            import json
            import pandas as pd
            import torch
            import data_setup, engine, model_builder, utils

            from torchvision import transforms
         
            

            # Setup hyperparameters
            NUM_EPOCHS = num_epochs
            BATCH_SIZE = batch_size
            HIDDEN_UNITS = hidden_units
            LEARNING_RATE = learning_rate
            MODEL_NAME = model_name
            MODEL_DIR = model_dir
            EXPORT_BUCKET = export_bucket

    

            # Setup directories
            TRAIN_DIR = train_dir
            TEST_DIR = test_dir

            # Setup target device
            device = "cuda" if torch.cuda.is_available() else "cpu"

            # Create transforms
            data_transform = transforms.Compose([
              transforms.Resize((64, 64)),
              transforms.ToTensor()
            ])

            # Create DataLoaders with help from data_setup.py
            train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(
                train_dir=TRAIN_DIR,
                test_dir=TEST_DIR,
                transform=data_transform,
                batch_size=BATCH_SIZE
            )

            # Create model with help from model_builder.py
            model = model_builder.TinyVGG(
                input_shape=3,
                hidden_units=HIDDEN_UNITS,
                output_shape=len(class_names)
            ).to(device)

            # Set loss and optimizer
            loss_fn = torch.nn.CrossEntropyLoss()
            optimizer = torch.optim.Adam(model.parameters(),
                                         lr=LEARNING_RATE)

            # Start training with help from engine.py
            result = engine.train(model=model,
                         train_dataloader=train_dataloader,
                         test_dataloader=test_dataloader,
                         loss_fn=loss_fn,
                         optimizer=optimizer,
                         epochs=NUM_EPOCHS,
                         device=device)

                  
                
            # Save the model with help from utils.py
            utils.save_model(model=model,
                             model_dir=MODEL_DIR,
                             model_name=MODEL_NAME + ".pth",
                             export_bucket=EXPORT_BUCKET)
            return result
            
            

This is what I want to return using OutputPath or Output. I want to convert this dict to a dataframe and store this as pipeline output for using in next step.

import pandas as pd
model_0_df = pd.DataFrame(x)
model_0_df

    test_acc    test_loss   train_acc   train_loss
0   0.197917    1.099857    0.316406    1.100338
1   0.541667    1.090166    0.253906    1.104582
2   0.541667    1.090699    0.281250    1.101027
3   0.260417    1.093742    0.277344    1.097989
4   0.260417    1.089232    0.304688    1.097009
5   0.541667    1.083633    0.414062    1.094837
6   0.541667    1.078700    0.281250    1.110045
7   0.541667    1.081974    0.281250    1.100708
8   0.541667    1.087431    0.402344    1.096542
9   0.541667    1.086511    0.281250    1.097733

Below is my pipeline

#%%writefile pipeline.py

from kfp import kubernetes
from kfp import dsl
from kfp import compiler

BASE_PATH="/data"
URL="https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip"
INPUT_BUCKET="datanewbucket"
NUM_EPOCHS=10
BATCH_SIZE = 32
HIDDEN_UNITS = 10
LEARNING_RATE = 0.01
MODEL_NAME = "cnn_tinyvg_v4"
MODEL_DIR = "/data/models"
EXPORT_BUCKET = "modeloutput"
TRAIN_DIR = "/data/train"
TEST_DIR = "/data/test"



from src.components.data_download.data_download_component import dataset_download
from src.components.model_train_cnn.model_train_component import model_train

@dsl.pipeline(name='CNN-TinyVG-Demo',
              description='This pipeline is a demo for training,evaluating and deploying Convutional Neural network',
              display_name='Kubeflow-MlFLow-Demo')



def kubeflow_pipeline(base_path: str = BASE_PATH,
                     url:str = URL,
                     batch_size:int = BATCH_SIZE,
                     train_dir:str = TRAIN_DIR,
                     test_dir:str = TEST_DIR,
                     input_bucket:str = INPUT_BUCKET,
                     num_epochs: int = NUM_EPOCHS,
                     hidden_units:int = HIDDEN_UNITS,
                     learning_rate:float = LEARNING_RATE,
                     model_name: str = MODEL_NAME,
                     model_dir: str = MODEL_DIR,
                     export_bucket: str = EXPORT_BUCKET
                     ):
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name='kubeflow-pvc4',
        access_modes=['ReadWriteOnce'],
        size='500Mi',
        storage_class_name='standard',
    )
    task1 = dataset_download(base_path=base_path,
                            url=url,
                            input_bucket=input_bucket)
    task1.set_caching_options(True)
    task2 = model_train(batch_size=batch_size,
                        num_epochs=num_epochs,
                        train_dir=train_dir,
                        test_dir=test_dir,
                        hidden_units=hidden_units,
                        learning_rate=learning_rate,
                        model_name=model_name,
                        model_dir=model_dir,
                        export_bucket=export_bucket,
                        ).after(task1)
    task2.set_caching_options(False)
 #   task3 = model_inference(data_dir=DATA_DIR).after(task2)
 #   task3.set_caching_options(False)
    kubernetes.mount_pvc(
        task1,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    kubernetes.mount_pvc(
        task2,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
 #   kubernetes.mount_pvc(
 #       task3,
 #       pvc_name=pvc1.outputs['name'],
 #       mount_path='/data',
 #   )

compiler.Compiler().compile(kubeflow_pipeline, 'kubeflow-demo.yaml')
kfp                       2.7.0
kfp-kubernetes            1.2.0
kfp-pipeline-spec         0.3.0
kfp-server-api            2.0.5

I tried many ways to return this dataset into an output dataset artifcat but I think my understanding is not correct.

Upvotes: 0

Views: 180

Answers (0)

Related Questions