Reputation: 395
I like to have multiple trainers running simultaneously using the same ExampleGen, Schema and Transform. Below is my code adding extra components as trainer2 evaluator2 and pusher2. But I've been getting the following error, and I'm not sure how to fix them. Can you please advise and thanks in advance!
Error: RuntimeError: Duplicated component_id Trainer for component type tfx.components.trainer.component.Trainer
def create_pipeline(
pipeline_name: Text,
pipeline_root: Text,
data_path: Text,
preprocessing_fn: Text,
run_fn: Text,
run_fn2: Text,
train_args: trainer_pb2.TrainArgs,
train_args2: trainer_pb2.TrainArgs,
eval_args: trainer_pb2.EvalArgs,
eval_args2: trainer_pb2.EvalArgs,
eval_accuracy_threshold: float,
eval_accuracy_threshold2: float,
serving_model_dir: Text,
serving_model_dir2: Text,
metadata_connection_config: Optional[
metadata_store_pb2.ConnectionConfig] = None,
beam_pipeline_args: Optional[List[Text]] = None,
ai_platform_training_args: Optional[Dict[Text, Text]] = None,
ai_platform_serving_args: Optional[Dict[Text, Any]] = None,
) -> pipeline.Pipeline:
"""Implements the custom pipeline with TFX."""
components = []
example_gen = CsvExampleGen(input=external_input(data_path))
components.append(example_gen)
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False)
components.append(schema_gen)
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
preprocessing_fn=preprocessing_fn)
components.append(transform)
trainer_args = {
'run_fn': run_fn,
'transformed_examples': transform.outputs['transformed_examples'],
'schema': schema_gen.outputs['schema'],
'transform_graph': transform.outputs['transform_graph'],
'train_args': train_args,
'eval_args': eval_args,
'custom_executor_spec':
executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
}
trainer = Trainer(**trainer_args)
components.append(trainer)
trainer_args2 = {
'run_fn': run_fn2,
'transformed_examples': transform.outputs['transformed_examples'],
'schema': schema_gen.outputs['schema'],
'transform_graph': transform.outputs['transform_graph'],
'train_args': train_args2,
'eval_args': eval_args2,
'custom_executor_spec':
executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
}
trainer2 = Trainer(**trainer_args2)
components.append(trainer2)
model_resolver = ResolverNode(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing))
components.append(model_resolver)
model_resolver2 = ResolverNode(
instance_name='latest_blessed_model_resolver2',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing))
components.append(model_resolver2)
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
#baseline_model=model_resolver.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config)
components.append(evaluator)
evaluator2 = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer2.outputs['model'],
baseline_model=model_resolver2.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config2)
components.append(evaluator2)
pusher_args = {
'model':
trainer.outputs['model'],
'model_blessing':
evaluator.outputs['blessing'],
'push_destination':
pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir)),
}
pusher = Pusher(**pusher_args)
components.append(pusher)
pusher_args2 = {
'model':
trainer2.outputs['model'],
'model_blessing':
evaluator2.outputs['blessing'],
'push_destination':
pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir2)),
}
pusher2 = Pusher(**pusher_args2) # pylint: disable=unused-variable
components.append(pusher2)
Upvotes: 3
Views: 551
Reputation: 1090
The instance_name
parameter has been deprecated. Instead, use with_id()
.
Example:
model_resolver2 = ResolverNode(
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing)
).with_id("latest_blessed_model_resolver2")
components.append(model_resolver2)
Upvotes: 1
Reputation: 395
The above issue has resolved by adding "instance_name" in each pipeline component to identify the unique name.
Upvotes: 3