Gianluigi
Gianluigi

Reputation: 167

How to pass parameter to dataflow template for pipeline construction

I am trying make a ancestor query like this example and transfer it to template version.

The problem is that the parameter ancestor_id is for the function make_query during pipeline construction. If I don't pass it when create and stage the template, I will get RuntimeValueProviderError: RuntimeValueProvider(option: ancestor_id, type: int).get() not called from a runtime context. But if I pass it at template creating, it seems like a StaticValueProvider that never change when I execute the template.

What is the correct way to pass parameter to template for pipeline construction?

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper
from googledatastore import PropertyFilter

class Test(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--ancestor_id', type=int)

def make_query(ancestor_id):
    ancestor = entity_pb2.Key()
    datastore_helper.add_key_path(ancestor, KIND, ancestor_id)
    query = query_pb2.Query()
    datastore_helper.set_kind(query, KIND)
    datastore_helper.set_property_filter(query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor)
    return query

pipeline_options = PipelineOptions()
test_options = pipeline_options.view_as(TestOptions)
with beam.Pipeline(options=pipline_options) as p:
  entities = p | ReadFromDatastore(PROJECT_ID, make_query(test_options.ancestor_id.get()))

Upvotes: 2

Views: 3617

Answers (1)

greeness
greeness

Reputation: 16104

Two problems.

  1. The ValueProvider.value.get() method can only run in a run-time method like ParDo.process(). See example.

  2. Further, your challenge is that your are using Google Cloud Datastore IO (a query from datastore). As of today (May 2018), the official documentation indicates that, Datastore IO is NOT accepting runtime template parameters yet.

For python, particularly,

The following connectors accept runtime parameters. File-based IOs: textio, avroio, tfrecordio

A workaround: you probably can first run a query without any templated parameters to get a PCollection of entities. At this time, since any transformers can accept a templated parameter you might be able to use it as a filter. But this depends on your use case and it may not applicable to you.

Upvotes: 4

Related Questions