Reputation: 143
I created a pipeline with ValueProviders
in order to use it as a template. However, I can't figure out how to use ValueProviders
when testing the Pipeline. I can't just use values directly to test because my PTransforms are waiting for ValueProviders.
Upvotes: 2
Views: 763
Reputation: 143
I put default values for the Value Providers
in my pipeline opions :
class MypipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
parser.add_value_provider_argument('--variable',
type=float,
dest='variable',
default=5)
So that when I call MyPipelineOptions in the test file, it automatically uses the default values.
Upvotes: 0
Reputation: 59
I do not know about Python, but in Java you may use StaticValue provider. E.g. if you have the following interface for options:
interface BaseOptions extends DataflowPipelineOptions {
void setSource(ValueProvider<String> source);
ValueProvider<String> getSource();
}
Then you may use ValueProvider.StaticValueProvider.of(...)
to initialise your parameter. Something like this:
BaseOptions options = PipelineOptionsFactory.fromArgs(args).as(BaseOptions.class);
options.setSource(ValueProvider.StaticValueProvider.of("/path/to/file"));
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getSource()))
.apply("just print",
new ParDo().of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element());
}
}));
p.run();
Upvotes: 1