Mohit Verma
Mohit Verma

Reputation: 43

How to create a kubeflow component from already running python scripts

I am very very new to both AI and MLOP's, please forgive me if my question is dumb. I am trying to learn about kubeflow but there is too much information on kubeflow documentation, then there are version >2 and <2 , these things adds to the complexity. I have python scripts to train a TinyVGG CNN model and then evaluate a model which works perfectly fine. I want to use those scripts to convert them into a pipeling, however I am not getting how to create kfp components from those files. Please help me to guide , how I can break these scripts into components and then use them in the pipeline. One of my major problem is how to pass complex data type liketorch.utils.data.dataloader.DataLoader from one component to other. I have checked kubeflow document , but it gives a simple example.

https://www.kubeflow.org/docs/components/pipelines/v2/components/containerized-python-components/

This is how my scripts setup looks.

.
├── __pycache__
│   ├── data_setup.cpython-310.pyc
│   ├── engine.cpython-310.pyc
│   ├── model_builder.cpython-310.pyc
│   └── utils.cpython-310.pyc
├── data_setup.py
├── engine.py
├── get_data.py
├── model_builder.py
├── predict.py
├── train.py
└── utils.py

To train, I simple run the train.py module with the arguments,

python modular_ML_code/going_modular_argparse/train.py  --num_epochs 10 --hidden_units=128 --train-dir=<path to train dir> --test-dir=<path to test_dir> 

In my train.py file, I have to import modules like below

import data_setup, engine, model_builder, utils
...

train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(
    train_dir=train_dir,
    test_dir=test_dir,
    transform=data_transform,
    batch_size=BATCH_SIZE
)
...

%%writefile modular_ML_code/going_modular_argparse/train.py
"""
Trains a PyTorch image classification model using device-agnostic code.
"""

import os
import argparse
import torch
import data_setup, engine, model_builder, utils

from torchvision import transforms

#Create a parser

parser = argparse.ArgumentParser(description="Get some hyperparameters")

# Get an arg for num_epochs

parser.add_argument("--num_epochs", 
                     type=int,
                     help="the number of epochs to train for",
                     default=10)

# Get an arg for batch_size

parser.add_argument("--batch_size", 
                    default=32,
                    type=int,
                    help="number of samples per batch")

                    
# Get an arg for hidden_units

parser.add_argument("--hidden_units",
                    default=10,
                    type=int,
                    help="number of hidden units in hidden layers")


# Get an arge fpr learning_rate 

parser.add_argument("--learning_rate",
                    default=0.001,
                    type=float,
                    help="learning rate use for model")

# Create a arg for the training directory

parser.add_argument("--train_dir",
                    default="modular_ML_code/data/pizza_steak_sushi/train",
                    type=str,
                    help="Directory file path to training data in standard image classification format")


# Create a arg for the testing directory

parser.add_argument("--test_dir",
                    default="modular_ML_code/data/pizza_steak_sushi/test",
                    type=str,
                    help="Directory file path to testing data in standard image classification format")



# Get out arguments from the parser

args = parser.parse_args()





# Setup hyperparameters
NUM_EPOCHS = args.num_epochs
BATCH_SIZE = args.batch_size
HIDDEN_UNITS = args.hidden_units
LEARNING_RATE = args.learning_rate


# Setup directories
train_dir = args.train_dir
test_dir = args.test_dir

# Setup target device
device = "cuda" if torch.cuda.is_available() else "cpu"

# Create transforms
data_transform = transforms.Compose([
  transforms.Resize((64, 64)),
  transforms.ToTensor()
])

# Create DataLoaders with help from data_setup.py
train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(
    train_dir=train_dir,
    test_dir=test_dir,
    transform=data_transform,
    batch_size=BATCH_SIZE
)

# Create model with help from model_builder.py
model = model_builder.TinyVGG(
    input_shape=3,
    hidden_units=HIDDEN_UNITS,
    output_shape=len(class_names)
).to(device)

# Set loss and optimizer
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),
                             lr=LEARNING_RATE)

# Start training with help from engine.py
engine.train(model=model,
             train_dataloader=train_dataloader,
             test_dataloader=test_dataloader,
             loss_fn=loss_fn,
             optimizer=optimizer,
             epochs=NUM_EPOCHS,
             device=device)

# Save the model with help from utils.py
utils.save_model(model=model,
                 target_dir="models",
                 model_name="05_going_modular_script_mode_tinyvgg_model.pth")

The output returns like this

0%|                                                    | 0/10 [00:00<?, ?it/s]Epoch: 1 | train_loss: 1.1460 | train_acc: 0.2891 | test_loss: 1.0858 | test_acc: 0.2708
 10%|████▍                                       | 1/10 [00:07<01:07,  7.45s/it]Epoch: 2 | train_loss: 1.0724 | train_acc: 0.2930 | test_loss: 1.0242 | test_acc: 0.3201
 20%|████████▊                                   | 2/10 [00:14<00:58,  7.32s/it]Epoch: 3 | train_loss: 1.0318 | train_acc: 0.5586 | test_loss: 1.0779 | test_acc: 0.3930
 30%|█████████████▏                              | 3/10 [00:22<00:51,  7.38s/it]Epoch: 4 | train_loss: 1.0128 | train_acc: 0.5000 | test_loss: 1.2437 | test_acc: 0.4034
 40%|█████████████████▌                          | 4/10 [00:29<00:43,  7.32s/it]Epoch: 5 | train_loss: 1.0905 | train_acc: 0.4336 | test_loss: 1.0099 | test_acc: 0.5852
 50%|██████████████████████                      | 5/10 [00:36<00:36,  7.30s/it]Epoch: 6 | train_loss: 0.9365 | train_acc: 0.6367 | test_loss: 1.0314 | test_acc: 0.4025
 60%|██████████████████████████▍                 | 6/10 [00:43<00:29,  7.26s/it]Epoch: 7 | train_loss: 0.9894 | train_acc: 0.5195 | test_loss: 1.0308 | test_acc: 0.4025
 70%|██████████████████████████████▊             | 7/10 [00:51<00:21,  7.30s/it]Epoch: 8 | train_loss: 1.0748 | train_acc: 0.5156 | test_loss: 1.0312 | test_acc: 0.3939
 80%|███████████████████████████████████▏        | 8/10 [00:58<00:14,  7.37s/it]Epoch: 9 | train_loss: 0.9672 | train_acc: 0.3867 | test_loss: 0.9279 | test_acc: 0.6250
 90%|███████████████████████████████████████▌    | 9/10 [01:05<00:07,  7.33s/it]Epoch: 10 | train_loss: 0.8184 | train_acc: 0.6758 | test_loss: 0.9775 | test_acc: 0.4233
100%|███████████████████████████████████████████| 10/10 [01:13<00:00,  7.31s/it]
[INFO] Saving model to: models/05_going_modular_script_mode_tinyvgg_model.pth

What I want is to break these steps in to different components and create a MLOPS pipeline. Could you please help.

Below is what I tried:-

import kfp
from kfp import dsl
from kfp import compiler
from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, ClassificationMetrics,
                        Metrics, component)


@component(
    base_image="python:3.10",
    packages_to_install=["boto3","requests","pathlib"]
)
def download_dataset(input_bucket: str, data_path: str
                    ):

    """Download the custom data set to the Kubeflow Pipelines volume to share it among all steps"""
    import os
    import zipfile
    import requests
    from pathlib import Path
    import boto3
    from botocore.client import Config
    
    s3 = boto3.client(
        "s3",
        endpoint_url="http://minio-service.kubeflow:9000",
        aws_access_key_id="minio",
        aws_secret_access_key="minio123",
        config=Config(signature_version="s3v4"),
    )
    # Create export bucket if it does not yet exist
    response = s3.list_buckets()
    input_bucket_exists = False
    for bucket in response["Buckets"]:
        if bucket["Name"] == input_bucket:
            input_bucket_exists = True
            
    if not input_bucket_exists:
        s3.create_bucket(ACL="public-read-write", Bucket=input_bucket)

    # Save zip files to S3 import_bucket
    data_path = Path(data_path)
    
    if data_path.is_dir():
      print(f"{data_path} directory exists.")
    else:
      print(f"Did not find {data_path} directory, creating one...")
      data_path.mkdir(parents=True,exist_ok=True)

    # Download pizza , steak and sushi data
    with open(data_path/ "pizza_steak_sushi.zip", "wb") as f:
        request = requests.get("https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip")
        print("Downloading pizza, steak, sushi data...")
        f.write(request.content)
        for root, dir, files in os.walk(data_path):
            for filename in files:
                local_path = os.path.join(root,filename)
                s3.upload_file(
                   local_path,
                   input_bucket,
                   f"{local_path}",
                   ExtraArgs={"ACL": "public-read"},
                 )  
             
@component(
    base_image="python:3.10",
    packages_to_install=["torch","torchvision","boto3","pathlib","requests"]
)
def process_data(
    input_bucket: str,
    data_path: str,
    train_dir: str, 
    test_dir: str, 
    batch_size: int, 
    num_workers: int=0,
     ):
    
  
    import os
    from torchvision import datasets, transforms
    from torch.utils.data import DataLoader
    import boto3
    import zipfile
    from pathlib import Path
    from botocore.client import Config
    import requests

  
    def create_dataloaders(
      train_dir: str, 
      test_dir: str, 
      transform: transforms.Compose, 
      batch_size: int, 
      num_workers: int=num_workers
        
      ):
      
      # Use ImageFolder to create dataset(s)
      train_data = datasets.ImageFolder(train_dir, transform=transform)
      test_data = datasets.ImageFolder(test_dir, transform=transform)
  
      # Get class names
      class_names = train_data.classes
  
      # Turn images into data loaders
      train_dataloader = DataLoader(
          train_data,
          batch_size=batch_size,
          shuffle=True,
          num_workers=num_workers,
          pin_memory=True,
      )
      test_dataloader = DataLoader(
          test_data,
          batch_size=batch_size,
          shuffle=False, # don't need to shuffle test data
          num_workers=num_workers,
          pin_memory=True,
      )
  
      return train_dataloader, test_dataloader, class_names
  
    data_transform =  transforms.Compose([
                  transforms.Resize((64, 64)),
                  transforms.ToTensor()
                  ])
    
    data_path = Path(data_path)
    image_path = data_path / "pizza_steak_sushi"
    
    if image_path.is_dir():
      print(f"{image_path} directory exists.")
    else:
      print(f"Did not find {image_path} directory, creating one...")
      image_path.mkdir(parents=True,exist_ok=True)

    with open(data_path/ "pizza_steak_sushi.zip", "wb") as f:
      request = requests.get("https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip")
      print("Downloading pizza, steak, sushi data...")
      f.write(request.content)

    #Unzip the data

    with zipfile.ZipFile(data_path/"pizza_steak_sushi.zip", "r") as zip_ref:
      print("Unzipping pizza, steak, sushi data...")
      zip_ref.extractall(image_path)

    # Remove the zip file 
    os.remove(data_path / "pizza_steak_sushi.zip")
        
   

    # Create DataLoaders with help from data_setup.py
    train_dataloader, test_dataloader, class_names = create_dataloaders(
                                                            train_dir=train_dir,
                                                            test_dir=test_dir,
                                                            transform=data_transform,
                                                            batch_size=batch_size
                                                            )

  

from kfp import compiler

INPUT_BUCKET = "inputdata"
DATA_PATH = "dataset"
TRAIN_DIR = "dataset/pizza_steak_sushi/train"
TEST_DIR = "dataset/pizza_steak_sushi/test"
BATCH_SIZE = 32
HIDDEN_UNITS = 10

@dsl.pipeline(
        name="End-to-End-MNIST",
        description="A sample pipeline to demonstrate multi-step model training, evaluation, export, and serving",
    )   
    
def my_pipeline(input_bucket: str = INPUT_BUCKET,
                 data_path: str = DATA_PATH,
                 train_dir: str = TRAIN_DIR,
                 test_dir: str = TEST_DIR,
                 batch_size: int = BATCH_SIZE,
                 hidden_units: int = HIDDEN_UNITS
                ):
    
    import_data_op =  download_dataset(input_bucket=INPUT_BUCKET,
                     data_path=DATA_PATH)

   
    create_data_loader_op = process_data(input_bucket=INPUT_BUCKET,
                                         data_path=DATA_PATH,
                                         train_dir=TRAIN_DIR,
                                         test_dir=TEST_DIR,
                                         batch_size=BATCH_SIZE,                                   
                                         )
    create_model_op = create_model(hidden_units=HIDDEN_UNITS)
    
    create_data_loader_op.after(import_data_op)
    create_model_op.after(create_data_loader_op)
    

if __name__ == '__main__':
    compiler.Compiler().compile( my_pipeline,'pipeline2.yaml')
from kfp.client import Client

client = Client(host='http://localhost:8002')
run = client.create_run_from_pipeline_package(
    'pipeline2.yaml',
)

Now I want to pass this class_names assingment in the other step, which is to create a model->

@component(
    base_image="python:3.10",
    packages_to_install=["torch","torchvision","boto3","pathlib","requests"]
)

def train_model(
    hidden_units: int, 
     ):
  
  import torch
  from torch import nn 
  
  class TinyVGG(nn.Module):

    def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape, 
                      out_channels=hidden_units, 
                      kernel_size=3, 
                      stride=1, 
                      padding=0),  
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units, 
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,
                          stride=2)
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=0),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            # Where did this in_features shape come from? 
            # It's because each layer of our network compresses and changes the shape of our inputs data.
            nn.Linear(in_features=hidden_units*13*13,
                      out_features=output_shape)
        )
  
    def forward(self, x: torch.Tensor):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.classifier(x)
        return x
        # return self.classifier(self.conv_block_2(self.conv_block_1(x))) # <- leverage the benefits of operator fusion
  device = "cuda" if torch.cuda.is_available() else "cpu" 
  model = TinyVGG(
      input_shape=3,
      hidden_units=hidden_units,
      output_shape=len(class_names)
  ).to(device)

Upvotes: 0

Views: 314

Answers (1)

Mohit Verma
Mohit Verma

Reputation: 43

I was able to create a pipeline using v2.0 method by containerising modules.

I have breakdown each step.

── Dockerfile
├── build.sh
├── src
│   ├── gen.py
│   ├── requirements.txt

from kfp import dsl

@dsl.container_component
def model_train():
    return dsl.ContainerSpec(image='mohitverma1688/model_train:v0.1.1', 
                             command=['/bin/sh'], args=['-c' ,' python3 train.py --num_epochs 10  --batch_size 32 --hidden_units 10 --train_dir /data/train  --learning_rate 0.01 --test_dir /data/train --target_dir /data/models '])

Upvotes: 0

Related Questions