Reputation: 1
from typing import Dict
from kfp.dsl import component, Input, Artifact
@component
def best_model(model_metrics_jsons: Dict[str, Input[Artifact]]):
import os
for model_name, metrics_input in model_metrics_jsons.items():
metrics_json_path = metrics_input.path
print(f"Model: {model_name}")
print(f"Metrics JSON Path: {metrics_json_path}")
if os.path.exists(metrics_json_path):
with open(metrics_json_path, 'r') as file:
metrics_json = file.read()
print(f"Metrics JSON Content: {metrics_json}")
else:
print(f"Metrics JSON Path does not exist: {metrics_json_path}")
from kfp import dsl
@dsl.pipeline(name='dynamic-datasets-pipeline')
def dynamic_datasets_pipeline():
# Example components
preprocess_task = preprocess_data() # Your preprocessing component
evaluate_model_tasks = evaluate_models() # Your evaluation tasks
# Create the dictionary of model metrics
model_metrics_dict: Dict[str, Input[Artifact]] = {
selected_model_names[i]: evaluate_model_tasks[i].outputs['model_metrics_json']
for i in range(len(selected_model_names))
}
# Call the best_model component with the dictionary
best_model_task = best_model(model_metrics_jsons=model_metrics_dict)
I suspect there may be an issue with how the dictionary model_metrics_dict is being passed to the best_model component or how the components are connected. Can someone help me identify the issue or suggest a solution?
Upvotes: 0
Views: 33