Reputation: 1
SourceLocation is prefix for my Bigtable, which is fetched from application.properties. Is there a way to fetch it dynamically while running the data flow template?
My Pipeline:
pipeline.apply("ReadTable", Read.from(CloudBigtableIO.read(configSetUp(options))))
CloudBigtableScanConfiguration
private static CloudBigtableScanConfiguration configSetUp(LocationSetupOptions options) {
ValueProvider<Integer> pageFilter = options.getPageFilter();
Scan scan = new Scan(Bytes.toBytes(options.getSourceLocation().get()));
FilterList filterList = new FilterList();
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(options.getSourceLocation().get()));
filterList.addFilter(new PageFilter(Long.valueOf(pageFilter.get())));
filterList.addFilter(prefixFilter);
scan.setFilter(filterList);
return new CloudBigtableScanConfiguration.Builder()
.withProjectId(options.getProjectId())
.withInstanceId(options.getInstanceId())
.withTableId(options.getTableId())
.withScan(scan)
.build();}
Upvotes: 0
Views: 187
Reputation: 2825
There are two clients for Bigtable CloudBigtableIO
and BigtableIO
. The CloudBigtableIO parameters are not updated to be modified by templates via a ValueProvider but BigtableIO is compatible with ValueProviders.
In your particular case if you are looking for ValueProvider
to be used along with template then I would recommend that you move to using BigtableIO
. A sample can be found here AvroToBigtable.
The @Default.InstanceFactory can be used to specify a user-provided factory method to generate default values for a parameter. With this, you could read the default value from a resource file inside of your DefaultValueFactory implementation.
As an example, you can check how WindowedWordCount defines DefaultToCurrentSystemTime to annotate the minTimestampMillis parameter:
Upvotes: 0