Reputation: 41
I'm trying to output multiple confusion matrices from a kubeflow pipeline component, like in this example that only outputs one:
def eval_model(
test_set: Input[Dataset],
xgb_model: Input[Model],
metrics: Output[ClassificationMetrics],
smetrics: Output[Metrics]
):
from xgboost import XGBClassifier
import pandas as pd
data = pd.read_csv(test_set.path)
model = XGBClassifier()
model.load_model(xgb_model.path)
score = model.score(
data.drop(columns=["target"]),
data.target,
)
from sklearn.metrics import roc_curve
y_scores = model.predict_proba(data.drop(columns=["target"]))[:, 1]
fpr, tpr, thresholds = roc_curve(
y_true=data.target.to_numpy(), y_score=y_scores, pos_label=True
)
metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
from sklearn.metrics import confusion_matrix
y_pred = model.predict(data.drop(columns=["target"]))
metrics.log_confusion_matrix(
["False", "True"],
confusion_matrix(
data.target, y_pred
).tolist(), # .tolist() to convert np array to list.
)
xgb_model.metadata["test_score"] = float(score)
smetrics.log_metric("score", float(score))
Do you know if it's possible to output multiple confusion matrices without having to define multiple Output args?
I can only think of defining the function as:
def eval_model(
test_set: Input[Dataset],
xgb_model: Input[Model],
metrics: Output[List[ClassificationMetrics]],
smetrics: Output[Metrics]
):
but that doesn't work because the list cannot call log_confusion_matrix
In their source code I didn't find much: https://github.com/kubeflow/pipelines/blob/55a2fb5c20011b01945c9867ddff0d39e9db1964/sdk/python/kfp/v2/components/types/artifact_types.py#L255-L256
Upvotes: 3
Views: 1790
Reputation: 1916
With the current component specification, a component can only output a single object of ClassificationMetrics
which can be visualized. Hence, even if log_confusion_matrix
is called multiple times, only the last confusion matrix is visualized since the previous calls are overwritten.
As a workaround for your requirement, there are 3 things I would suggest.
for
loop in the pipeline specification to generate multiple components which output one confusion matrix each. For example, each model can be run as a component and it’s confusion matrix in the respective components. Refer to the code below.@dsl.pipeline(
name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.2)
for model in ["xgb_classifier_1.joblib", "xgb_classifier_2.joblib"]:
eval_task = eval_model(model)
ParallelFor
from the kfp.dsl
package that would run multiple instances of the same component in parallel. Refer to the code below.@dsl.pipeline(
name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.2)
with ParallelFor(["xgb_classifier_1.joblib", "xgb_classifier_2.joblib"]) as model:
eval_task = eval_model(model)
I got similar results with workarounds 1) and 2).
here
.@component(
packages_to_install=['sklearn'],
base_image='python:3.9',
)
def iris_sgdclassifier(
test_samples_fraction: float
) -> dict:
from sklearn import datasets, model_selection
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import confusion_matrix
iris_dataset = datasets.load_iris()
train_x, test_x, train_y, test_y = model_selection.train_test_split(
iris_dataset['data'], iris_dataset['target'], test_size=test_samples_fraction)
classifier = SGDClassifier()
confusion_matrices_dictionary = {}
for cv in [3, 5, 7]: ## confusion matrices generated for multiple values of cross-validation splits
classifier.fit(train_x, train_y)
predictions = model_selection.cross_val_predict(classifier, train_x, train_y, cv=cv)
confusion_matrices_dictionary["experiment_cv_"+str(cv)] = {"categories":['Setosa', 'Versicolour', 'Virginica'], "confusion_matrix":confusion_matrix(train_y, predictions).tolist()}
return confusion_matrices_dictionary
## an element in the dictionary => {'experiment_cv_3': {'categories': ['Setosa', 'Versicolour', 'Virginica'],
## 'confusion_matrix': [[35, 0, 0], [16, 7, 19], [0, 0, 43]]}}
@component(
packages_to_install=["numpy"],
base_image='python:3.9',
)
def visualize_confusion_matrices(
confusion_matrices_dictionary: dict,
markdown_artifact: Output[Markdown]
):
import numpy as np
for key in confusion_matrices_dictionary.keys():
categories = confusion_matrices_dictionary[key]["categories"]
confusion_matrix = confusion_matrices_dictionary[key]["confusion_matrix"]
matrix_size = len(categories)+1
table_struct = np.empty((matrix_size, matrix_size), dtype=object)
table_struct[0,0] = "Categories"
table_struct[0,1:] = categories
table_struct[1:,0] = categories
table_struct[1:,1:] = confusion_matrix
## code to convert list to markdown table
markdown = "## "+key+"\n\n" + str("| ")
for e in table_struct[0]:
to_add = " " + str(e) + str(" |")
markdown += to_add
markdown += "\n"
markdown += '|'
for i in range(len(table_struct[0])):
markdown += str("-------------- | ")
markdown += "\n"
for entry in table_struct[1:]:
markdown += str("| ")
for e in entry:
to_add = str(e) + str(" | ")
markdown += to_add
markdown += "\n"
with open(markdown_artifact.path, 'a') as f:
f.write(markdown)
@dsl.pipeline(
name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.2)
cms = visualize_confusion_matrices(iris_sgdclassifier_op.output)
Upvotes: 2