LLTeng
LLTeng

Reputation: 395

Kubeflow Pipeline RuntimeError while running multiple trainers with tfx

I like to have multiple trainers running simultaneously using the same ExampleGen, Schema and Transform. Below is my code adding extra components as trainer2 evaluator2 and pusher2. But I've been getting the following error, and I'm not sure how to fix them. Can you please advise and thanks in advance!

Error: RuntimeError: Duplicated component_id Trainer for component type tfx.components.trainer.component.Trainer

def create_pipeline(
    pipeline_name: Text,
    pipeline_root: Text,
    data_path: Text,
    preprocessing_fn: Text,
    run_fn: Text,
    run_fn2: Text,
    train_args: trainer_pb2.TrainArgs,
    train_args2: trainer_pb2.TrainArgs,
    eval_args: trainer_pb2.EvalArgs,
    eval_args2: trainer_pb2.EvalArgs,
    eval_accuracy_threshold: float,
    eval_accuracy_threshold2: float,
    serving_model_dir: Text,
    serving_model_dir2: Text,
    metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
    beam_pipeline_args: Optional[List[Text]] = None,
    ai_platform_training_args: Optional[Dict[Text, Text]] = None,
    ai_platform_serving_args: Optional[Dict[Text, Any]] = None,
) -> pipeline.Pipeline:
  """Implements the custom pipeline with TFX."""

  components = []
  example_gen = CsvExampleGen(input=external_input(data_path))
  components.append(example_gen)

schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'],
      infer_feature_shape=False)
  components.append(schema_gen)

transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      preprocessing_fn=preprocessing_fn)
  components.append(transform)

trainer_args = {
      'run_fn': run_fn,
      'transformed_examples': transform.outputs['transformed_examples'],
      'schema': schema_gen.outputs['schema'],
      'transform_graph': transform.outputs['transform_graph'],
      'train_args': train_args,
      'eval_args': eval_args,
      'custom_executor_spec':
          executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
  }

  trainer = Trainer(**trainer_args)
  components.append(trainer)

  trainer_args2 = {
      'run_fn': run_fn2,
      'transformed_examples': transform.outputs['transformed_examples'],
      'schema': schema_gen.outputs['schema'],
      'transform_graph': transform.outputs['transform_graph'],
      'train_args': train_args2,
      'eval_args': eval_args2,
      'custom_executor_spec':
          executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
  }

  trainer2 = Trainer(**trainer_args2)
  components.append(trainer2)

  model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
  components.append(model_resolver)

  model_resolver2 = ResolverNode(
      instance_name='latest_blessed_model_resolver2',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
  components.append(model_resolver2)



  evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      #baseline_model=model_resolver.outputs['model'],
      # Change threshold will be ignored if there is no baseline (first run).
      eval_config=eval_config)
  components.append(evaluator)

  evaluator2 = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer2.outputs['model'],
      baseline_model=model_resolver2.outputs['model'],
      # Change threshold will be ignored if there is no baseline (first run).
      eval_config=eval_config2)
  components.append(evaluator2)

  pusher_args = {
      'model':
          trainer.outputs['model'],
      'model_blessing':
          evaluator.outputs['blessing'],
      'push_destination':
          pusher_pb2.PushDestination(
              filesystem=pusher_pb2.PushDestination.Filesystem(
                  base_directory=serving_model_dir)),
  }

  pusher = Pusher(**pusher_args)  
  components.append(pusher)

  pusher_args2 = {
      'model':
          trainer2.outputs['model'],
      'model_blessing':
          evaluator2.outputs['blessing'],
      'push_destination':
          pusher_pb2.PushDestination(
              filesystem=pusher_pb2.PushDestination.Filesystem(
                  base_directory=serving_model_dir2)),
  }

  pusher2 = Pusher(**pusher_args2)  # pylint: disable=unused-variable
  components.append(pusher2)

Upvotes: 3

Views: 551

Answers (2)

pirateofebay
pirateofebay

Reputation: 1090

The instance_name parameter has been deprecated. Instead, use with_id().

Example:

model_resolver2 = ResolverNode(
    resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
    model=Channel(type=Model),
    model_blessing=Channel(type=ModelBlessing)
).with_id("latest_blessed_model_resolver2")
    
components.append(model_resolver2)

Upvotes: 1

LLTeng
LLTeng

Reputation: 395

The above issue has resolved by adding "instance_name" in each pipeline component to identify the unique name.

Upvotes: 3

Related Questions