VanDough
VanDough

Reputation: 41

Is it possible to output a list of artifacts of the same type using kubeflow pipelines?

I'm trying to output multiple confusion matrices from a kubeflow pipeline component, like in this example that only outputs one:

def eval_model(
    test_set: Input[Dataset],
    xgb_model: Input[Model],
    metrics: Output[ClassificationMetrics],
    smetrics: Output[Metrics]
):
    from xgboost import XGBClassifier
    import pandas as pd

    data = pd.read_csv(test_set.path)
    model = XGBClassifier()
    model.load_model(xgb_model.path)

    score = model.score(
        data.drop(columns=["target"]),
        data.target,
    )

    from sklearn.metrics import roc_curve
    y_scores =  model.predict_proba(data.drop(columns=["target"]))[:, 1]
    fpr, tpr, thresholds = roc_curve(
        y_true=data.target.to_numpy(), y_score=y_scores, pos_label=True
    )
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())

    from sklearn.metrics import confusion_matrix
    y_pred = model.predict(data.drop(columns=["target"]))

    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
            data.target, y_pred
       ).tolist(),  # .tolist() to convert np array to list.
    )

    xgb_model.metadata["test_score"] = float(score)
    smetrics.log_metric("score", float(score))

Do you know if it's possible to output multiple confusion matrices without having to define multiple Output args?

I can only think of defining the function as:

def eval_model(
    test_set: Input[Dataset],
    xgb_model: Input[Model],
    metrics: Output[List[ClassificationMetrics]],
    smetrics: Output[Metrics]
 ):

but that doesn't work because the list cannot call log_confusion_matrix

In their source code I didn't find much: https://github.com/kubeflow/pipelines/blob/55a2fb5c20011b01945c9867ddff0d39e9db1964/sdk/python/kfp/v2/components/types/artifact_types.py#L255-L256

Upvotes: 3

Views: 1790

Answers (1)

Kabilan Mohanraj
Kabilan Mohanraj

Reputation: 1916

With the current component specification, a component can only output a single object of ClassificationMetrics which can be visualized. Hence, even if log_confusion_matrix is called multiple times, only the last confusion matrix is visualized since the previous calls are overwritten.

As a workaround for your requirement, there are 3 things I would suggest.

  1. Use a for loop in the pipeline specification to generate multiple components which output one confusion matrix each. For example, each model can be run as a component and it’s confusion matrix in the respective components. Refer to the code below.
@dsl.pipeline(
    name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
    iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.2)

    for model in ["xgb_classifier_1.joblib", "xgb_classifier_2.joblib"]:
        eval_task = eval_model(model)
  1. Use the ParallelFor from the kfp.dsl package that would run multiple instances of the same component in parallel. Refer to the code below.
@dsl.pipeline(
    name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
    iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.2)

    with ParallelFor(["xgb_classifier_1.joblib", "xgb_classifier_2.joblib"]) as model:
        eval_task = eval_model(model)

I got similar results with workarounds 1) and 2).

  1. Use the markdown or HTML visualization as suggested in this documentation to visualize multiple confusion matrices in a single markdown or HTML artifact. The component code below is a markdown artifact that tabulates 3 confusion matrices. The code to convert a list to a markdown table was sourced from here.
@component(
    packages_to_install=['sklearn'],
    base_image='python:3.9',
)
def iris_sgdclassifier(
                      test_samples_fraction: float
                      ) -> dict:

    from sklearn import datasets, model_selection
    from sklearn.linear_model import SGDClassifier
    from sklearn.metrics import confusion_matrix

    iris_dataset = datasets.load_iris()
    train_x, test_x, train_y, test_y = model_selection.train_test_split(
        iris_dataset['data'], iris_dataset['target'], test_size=test_samples_fraction)

    classifier = SGDClassifier()

    confusion_matrices_dictionary = {}

    for cv in [3, 5, 7]: ## confusion matrices generated for multiple values of cross-validation splits
        classifier.fit(train_x, train_y)
        predictions = model_selection.cross_val_predict(classifier, train_x, train_y, cv=cv)
        confusion_matrices_dictionary["experiment_cv_"+str(cv)] = {"categories":['Setosa', 'Versicolour', 'Virginica'], "confusion_matrix":confusion_matrix(train_y, predictions).tolist()}

    return confusion_matrices_dictionary 
    ## an element in the dictionary => {'experiment_cv_3': {'categories': ['Setosa', 'Versicolour', 'Virginica'], 
    ## 'confusion_matrix': [[35, 0, 0], [16, 7, 19], [0, 0, 43]]}}


@component(
    packages_to_install=["numpy"],
    base_image='python:3.9',
)
def visualize_confusion_matrices(
                                confusion_matrices_dictionary: dict,
                                markdown_artifact: Output[Markdown]
                                ):
    
    import numpy as np

    for key in confusion_matrices_dictionary.keys():
        categories = confusion_matrices_dictionary[key]["categories"]
        confusion_matrix = confusion_matrices_dictionary[key]["confusion_matrix"]
        matrix_size = len(categories)+1
        table_struct = np.empty((matrix_size, matrix_size), dtype=object)

        table_struct[0,0] = "Categories"
        table_struct[0,1:] = categories
        table_struct[1:,0] = categories
        table_struct[1:,1:] = confusion_matrix

        ## code to convert list to markdown table
        markdown = "## "+key+"\n\n" + str("| ")

        for e in table_struct[0]:
            to_add = " " + str(e) + str(" |")
            markdown += to_add
        markdown += "\n"

        markdown += '|'
        for i in range(len(table_struct[0])):
            markdown += str("-------------- | ")
        markdown += "\n"

        for entry in table_struct[1:]:
            markdown += str("| ")
            for e in entry:
                to_add = str(e) + str(" | ")
                markdown += to_add
            markdown += "\n"

        with open(markdown_artifact.path, 'a') as f:
            f.write(markdown)


@dsl.pipeline(
    name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
    iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.2)
    cms = visualize_confusion_matrices(iris_sgdclassifier_op.output)

Upvotes: 2

Related Questions