Ananth T
Ananth T

Reputation: 1

Google Cloud Data flow : CloudBigtableScanConfiguration.withScan(), how to pass dynamic filter values?

SourceLocation is prefix for my Bigtable, which is fetched from application.properties. Is there a way to fetch it dynamically while running the data flow template?

My Pipeline:

pipeline.apply("ReadTable", Read.from(CloudBigtableIO.read(configSetUp(options))))

CloudBigtableScanConfiguration

private static CloudBigtableScanConfiguration configSetUp(LocationSetupOptions options) {
    ValueProvider<Integer>  pageFilter = options.getPageFilter();
    Scan scan = new Scan(Bytes.toBytes(options.getSourceLocation().get()));

    FilterList filterList = new FilterList();
    PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(options.getSourceLocation().get()));
    filterList.addFilter(new PageFilter(Long.valueOf(pageFilter.get())));
    filterList.addFilter(prefixFilter);
    scan.setFilter(filterList);

    return new CloudBigtableScanConfiguration.Builder()
        .withProjectId(options.getProjectId())
        .withInstanceId(options.getInstanceId())
        .withTableId(options.getTableId())
        .withScan(scan)
        .build();}

Upvotes: 0

Views: 187

Answers (1)

Jayadeep Jayaraman
Jayadeep Jayaraman

Reputation: 2825

There are two clients for Bigtable CloudBigtableIO and BigtableIO. The CloudBigtableIO parameters are not updated to be modified by templates via a ValueProvider but BigtableIO is compatible with ValueProviders.

In your particular case if you are looking for ValueProvider to be used along with template then I would recommend that you move to using BigtableIO. A sample can be found here AvroToBigtable.

UPDATE

The @Default.InstanceFactory can be used to specify a user-provided factory method to generate default values for a parameter. With this, you could read the default value from a resource file inside of your DefaultValueFactory implementation.

As an example, you can check how WindowedWordCount defines DefaultToCurrentSystemTime to annotate the minTimestampMillis parameter:

Upvotes: 0

Related Questions