Missaratiskhona
Missaratiskhona

Reputation: 143

Testing a Pipeline containing ValueProviders

I created a pipeline with ValueProviders in order to use it as a template. However, I can't figure out how to use ValueProviders when testing the Pipeline. I can't just use values directly to test because my PTransforms are waiting for ValueProviders.

Upvotes: 2

Views: 763

Answers (2)

Missaratiskhona
Missaratiskhona

Reputation: 143

I put default values for the Value Providers in my pipeline opions :

class MypipelineOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls,parser):
      parser.add_value_provider_argument('--variable',
                                        type=float,
                                        dest='variable',
                                        default=5)

So that when I call MyPipelineOptions in the test file, it automatically uses the default values.

Upvotes: 0

DrDecay
DrDecay

Reputation: 59

I do not know about Python, but in Java you may use StaticValue provider. E.g. if you have the following interface for options:

interface BaseOptions extends DataflowPipelineOptions {
    void setSource(ValueProvider<String> source);
    ValueProvider<String> getSource();
}

Then you may use ValueProvider.StaticValueProvider.of(...) to initialise your parameter. Something like this:

BaseOptions options = PipelineOptionsFactory.fromArgs(args).as(BaseOptions.class);
options.setSource(ValueProvider.StaticValueProvider.of("/path/to/file"));
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getSource()))
        .apply("just print",
               new ParDo().of(new DoFn<String, String>() {
               @ProcessElement
           public void processElement(ProcessContext c) {
               System.out.println(c.element());

           }
}));
p.run();

Upvotes: 1

Related Questions